Split program/rendition/variant/target operations

Significant rewrite after model modification: introducing `*Sources`
objects that encapsulate metadata and fetch information (urls,
protocols). The API (#20) is organized as pipe elements with sources
being what flows through the pipe.
    1. fetch program sources
    2. fetch rendition sources
    3. fetch variant sources
    4. fetch targets
    5. process (download+mux) targets
Some user selection filter or modifiers could then be applied at any
step of the pipe. Our __main__.py is an implementation of that scheme.

Implied modifications include:
 - Later failure on unsupported protocols, used to be in `api`, now in
   `hls`. This offers the possibility to filter and/or support them
   later.
 - Give up honoring the http ranges for mp4 download, stream-download
   them by fixed chunk instead.
 - Cleaning up of the `hls` module moving the main download function to
   __init__ and specific (mp4/vtt) download functions to a new
   `download` module.

On the side modifications include:
 - The progress handler showing downloading rates.
 - The naming utilities providing rendition and variant code insertion.
 - Download parts to working directories and skip unnecessary
   re-downloads on failure.

This was a big change for a single commit... too big of a change maybe.
This commit is contained in:
Barbagus 2023-01-24 08:27:37 +01:00
parent ed5ba06a98
commit 56c1e8468a
10 changed files with 582 additions and 603 deletions

View File

@ -9,153 +9,165 @@ from .error import *
from .model import *
def fetch_sources(http_session, url):
"""Fetch sources at a given ArteTV page URL."""
from .api import fetch_program_info
from .hls import fetch_program_tracks
from .www import fetch_program
def fetch_program_sources(url, http_session):
"""Fetch program sources listed on given ArteTV page."""
from .www import iter_programs
p_meta = fetch_program(http_session, url)
variants = dict()
renditions = dict()
program_index_urls = fetch_program_info(http_session, p_meta)
for program_index_url in program_index_urls:
v_tracks, a_track, s_track = fetch_program_tracks(
http_session, program_index_url
return [
ProgramSource(
program,
player_config_url,
)
for v_meta, v_url in v_tracks:
if v_meta not in variants:
variants[v_meta] = v_url
elif variants[v_meta] != v_url:
raise ValueError
for program, player_config_url in iter_programs(url, http_session)
]
a_meta, a_url = a_track
s_meta, s_url = s_track or (None, None)
if (a_meta, s_meta) not in renditions:
renditions[(a_meta, s_meta)] = (a_url, s_url)
elif renditions[(a_meta, s_meta)] != (a_url, s_url):
raise ValueError
def fetch_rendition_sources(program_sources, http_session):
"""Fetch renditions for given programs."""
from itertools import groupby
return Sources(
p_meta,
[Variant(key, source) for key, source in variants.items()],
[Rendition(key, source) for key, source in renditions.items()],
from .api import iter_renditions
sources = [
RenditionSource(
program,
rendition,
protocol,
program_index_url,
)
for program, player_config_url in program_sources
for rendition, protocol, program_index_url in iter_renditions(
program.id,
player_config_url,
http_session,
)
]
descriptors = list({(s.rendition.code, s.rendition.label) for s in sources})
descriptors.sort()
for code, group in groupby(descriptors, key=lambda t: t[0]):
labels_for_code = [t[1] for t in group]
if len(labels_for_code) != 1:
raise UnexpectedError("MULTIPLE_RENDITION_LABELS", code, labels_for_code)
return sources
def fetch_variant_sources(renditions_sources, http_session):
"""Fetch variants for given renditions."""
from itertools import groupby
from .hls import iter_variants
sources = [
VariantSource(
program,
rendition,
variant,
VariantSource.VideoMedia(*video),
VariantSource.AudioMedia(*audio),
VariantSource.SubtitlesMedia(*subtitles) if subtitles else None,
)
for program, rendition, protocol, program_index_url in renditions_sources
for variant, video, audio, subtitles in iter_variants(
protocol, program_index_url, http_session
)
]
descriptors = list(
{(s.variant.code, s.video_media.track.frame_rate) for s in sources}
)
descriptors.sort()
for code, group in groupby(descriptors, key=lambda t: t[0]):
frame_rates_for_code = [t[1] for t in group]
if len(frame_rates_for_code) != 1:
raise UnexpectedError(
"MULTIPLE_RENDITION_FRAME_RATES", code, frame_rates_for_code
)
def iter_renditions(sources):
"""Iterate over renditions (code, key) of the given sources."""
keys = [r.key for r in sources.renditions]
keys.sort(
key=lambda k: (
not k[0].is_original,
k[0].language,
k[0].is_descriptive,
k[1].language if k[1] else "",
k[1].is_descriptive if k[1] else False,
)
)
for (a_meta, s_meta) in keys:
code = a_meta.language
if a_meta.is_descriptive:
code += "[AD]"
if s_meta:
if s_meta.is_descriptive:
code += f"-{s_meta.language}[CC]"
elif s_meta.language != a_meta.language:
code += f"-{s_meta.language}"
yield code, (a_meta, s_meta)
return sources
def select_rendition(sources, key):
"""Reject all other renditions from the given sources."""
renditions = [r for r in sources.renditions if r.key == key]
match len(renditions):
case 0:
raise ValueError("rendition not found")
case 1:
pass
case _:
raise ValueError("non unique rendition")
sources.renditions[:] = renditions
def iter_variants(sources):
"""Iterate over variants (code, key) of the given sources."""
import itertools
keys = [v.key for v in sources.variants]
keys.sort(key=lambda k: (k.height, k.frame_rate), reverse=True)
for height, group in itertools.groupby(keys, lambda m: m.height):
group = list(group)
if len(group) == 1:
yield f"{height}p", group[0]
else:
for m in group:
yield f"{height}p@{m.frame_rate}", m
def select_variant(sources, key):
"""Reject all other variants from the given sources."""
variants = [v for v in sources.variants if v.key == key]
match len(variants):
case 0:
raise ValueError("variant not found")
case 1:
pass
case _:
raise ValueError("non unique variant")
sources.variants[:] = variants
def compile_sources(sources, **naming_options):
"""Return target from the given sources."""
def fetch_targets(variant_sources, http_session, **naming_options):
"""Compile download targets for given variants."""
from .hls import fetch_mp4_media, fetch_vtt_media
from .naming import file_name_builder
match len(sources.variants):
case 0:
raise ValueError("no variants")
case 1:
v_meta, v_url = sources.variants[0]
case _:
raise ValueError("multiple variants")
build_file_name = file_name_builder(**naming_options)
match len(sources.renditions):
case 0:
raise ValueError("no renditions")
case 1:
(a_meta, s_meta), (a_url, s_url) = sources.renditions[0]
case _:
raise ValueError("multiple renditions")
targets = [
Target(
Target.VideoInput(
video_media.track,
fetch_mp4_media(video_media.track_index_url, http_session),
),
Target.AudioInput(
audio_media.track,
fetch_mp4_media(audio_media.track_index_url, http_session),
),
(
Target.SubtitlesInput(
subtitles_media.track,
fetch_vtt_media(subtitles_media.track_index_url, http_session),
)
if subtitles_media
else None
),
(program.title, program.subtitle) if program.subtitle else program.title,
build_file_name(program, rendition, variant),
)
for program, rendition, variant, video_media, audio_media, subtitles_media in variant_sources
]
build_file_name = file_name_builder(v_meta, a_meta, s_meta, **naming_options)
return Target(
sources.program,
VideoTrack(v_meta, v_url),
AudioTrack(a_meta, a_url),
SubtitlesTrack(s_meta, s_url) if s_meta else None,
build_file_name(sources.program),
)
return targets
def download_target(http_session, target, progress):
"""Download the given target."""
from .hls import download_target_tracks
def download_targets(targets, http_session, on_progress):
"""Download given target."""
import os
from .download import download_mp4_media, download_vtt_media
from .muxing import mux_target
with download_target_tracks(http_session, target, progress) as local_target:
mux_target(local_target, progress)
for target in targets:
video_path = target.output + ".video.mp4"
audio_path = target.output + ".audio.mp4"
subtitles_path = target.output + ".srt"
download_mp4_media(
target.video_input.url, video_path, http_session, on_progress
)
download_mp4_media(
target.audio_input.url, audio_path, http_session, on_progress
)
if target.subtitles_input:
download_vtt_media(
target.subtitles_input.url, subtitles_path, http_session, on_progress
)
mux_target(
target._replace(
video_input=target.video_input._replace(url=video_path),
audio_input=target.audio_input._replace(url=audio_path),
subtitles_input=(
target.subtitles_input._replace(url=subtitles_path)
if target.subtitles_input
else None
),
),
on_progress,
)
if os.path.isfile(subtitles_path):
os.unlink(subtitles_path)
if os.path.isfile(audio_path):
os.unlink(audio_path)
if os.path.isfile(video_path):
os.unlink(video_path)

View File

@ -26,9 +26,11 @@ Options:
--name-sep=<sep> field separator [default: - ]
--name-seq-pfx=<pfx> sequence counter prefix [default: - ]
--name-seq-no-pad disable sequence zero-padding
--name-add-resolution add resolution tag
--name-add-rendition add rendition code
--name-add-variant add variant code
"""
import itertools
import sys
import time
@ -36,16 +38,15 @@ import docopt
import requests
from . import (
ModuleError,
UnexpectedError,
__version__,
compile_sources,
download_target,
fetch_sources,
iter_renditions,
iter_variants,
select_rendition,
select_variant,
download_targets,
fetch_program_sources,
fetch_rendition_sources,
fetch_targets,
fetch_variant_sources,
)
from .error import ModuleError, UnexpectedError
class Abort(ModuleError):
@ -56,131 +57,104 @@ class Fail(UnexpectedError):
"""Unexpected error."""
_LANGUAGES = {
"de": "German",
"en": "English",
"es": "Spanish",
"fr": "French",
"it": "Italian",
"mul": "multiple language",
"no": "Norwegian",
"pt": "Portuguese",
}
def _create_progress():
# create a progress handler for input downloads
state = {}
def _language_name_for_code(code):
return _LANGUAGES.get(code, f"[{code}]")
def _language_name(meta):
return _language_name_for_code(meta.language)
def _print_renditions(renditions):
has_original = False
for code, (a_meta, s_meta) in renditions:
label = _language_name(a_meta)
if a_meta.is_original:
has_original = True
label = "original " + label
elif a_meta.is_descriptive:
label += " audio description"
elif has_original:
label += " dubbed"
if s_meta:
if s_meta.is_descriptive:
label += f" ({_language_name(s_meta)} closed captions)"
elif s_meta.language != a_meta.language:
label += f" ({_language_name(s_meta)} subtitles)"
print(f"\t{code:>6} - {label}")
def _validate_rendition(renditions, code):
for code_, rendition in renditions:
if code_ == code:
break
else:
print(f"{code!r} is not a valid rendition code, possible values are:")
_print_renditions(renditions)
raise Abort()
return rendition
def _print_variants(variants):
for code, _ in variants:
print(f"\t{code}")
def _validate_variant(variants, code):
for code_, variant in variants:
if code_ == code:
break
else:
print(f"{code!r} is not a valid variant code, possible values are:")
_print_variants(variants)
raise Abort()
return variant
def create_progress():
"""Create a progress handler for input downloads."""
state = {
"last_update_time": 0,
"last_channel": None,
}
def progress(channel, current, total):
def on_progress(file, current, total):
now = time.time()
if current == total:
print(f"\rDownloading {channel}: 100.0%")
state["last_update_time"] = now
elif channel != state["last_channel"]:
print(f"Downloading {channel}: 0.0%", end="")
state["last_update_time"] = now
state["last_channel"] = channel
elif now - state["last_update_time"] > 1:
if current == 0:
print(f"Downloading {file!r}: 0.0%", end="")
state["start_time"] = now
state["last_time"] = now
state["last_count"] = 0
elif current == total:
elapsed_time = now - state["start_time"]
rate = int(total / elapsed_time) if elapsed_time else "NaN"
print(f"\rDownloading {file!r}: 100.0% [{rate}]")
state.clear()
elif now - state["last_time"] > 1:
elapsed_time1 = now - state["start_time"]
elapsed_time2 = now - state["last_time"]
progress = int(1000.0 * current / total) / 10.0
rate1 = int(current / elapsed_time1) if elapsed_time1 else "NaN"
rate2 = (
int((current - state["last_count"]) / elapsed_time2)
if elapsed_time2
else "NaN"
)
print(
f"\rDownloading {channel}: {int(1000.0 * current / total) / 10.0}%",
f"\rDownloading {file!r}: {progress}% [{rate1}, {rate2}]",
end="",
)
state["last_update_time"] = now
state["last_time"] = now
state["last_count"] = current
return progress
return on_progress
def _select_rendition_sources(rendition_code, rendition_sources):
if rendition_code:
filtered = [s for s in rendition_sources if s.rendition.code == rendition_code]
if filtered:
return filtered
print(
f"{rendition_code!r} is not a valid rendition code. Available values are:"
)
else:
print("Available renditions:")
key = lambda s: (s.rendition.label, s.rendition.code)
rendition_sources.sort(key=key)
for (label, code), _ in itertools.groupby(rendition_sources, key=key):
print(f"{code:>12} : {label}")
raise Abort()
def _select_variant_sources(variant_code, variant_sources):
if variant_code:
filtered = [s for s in variant_sources if s.variant.code == variant_code]
if filtered:
return filtered
print(f"{variant_code!r} is not a valid variant code. Available values are:")
else:
print("Available variants:")
variant_sources.sort(key=lambda s: s.video_media.track.height, reverse=True)
for code, _ in itertools.groupby(variant_sources, key=lambda s: s.variant.code):
print(f"{code:>12}")
raise Abort()
def main():
"""CLI command."""
args = docopt.docopt(__doc__, sys.argv[1:], version=__version__)
http_session = requests.sessions.Session()
try:
http_session = requests.sessions.Session()
program_sources = fetch_program_sources(args["URL"], http_session)
sources = fetch_sources(http_session, args["URL"])
rendition_sources = _select_rendition_sources(
args["RENDITION"],
fetch_rendition_sources(program_sources, http_session),
)
renditions = list(iter_renditions(sources))
if not args["RENDITION"]:
print(f"Available renditions:")
_print_renditions(renditions)
return 0
variant_sources = _select_variant_sources(
args["VARIANT"],
fetch_variant_sources(rendition_sources, http_session),
)
select_rendition(sources, _validate_rendition(renditions, args["RENDITION"]))
variants = list(iter_variants(sources))
if not args["VARIANT"]:
print(f"Available variants:")
_print_variants(variants)
return 0
select_variant(sources, _validate_variant(variants, args["VARIANT"]))
target = compile_sources(
sources,
targets = fetch_targets(
variant_sources,
http_session,
**{
k[7:].replace("-", "_"): v
for k, v in args.items()
@ -188,9 +162,7 @@ def main():
},
)
progress = create_progress()
download_target(http_session, target, progress)
download_targets(targets, http_session, _create_progress())
except UnexpectedError as e:
print(str(e))

View File

@ -3,75 +3,67 @@
"""Provide ArteTV JSON API utilities."""
import contextlib
from .error import UnexpectedAPIResponse, UnsupportedHLSProtocol
from .error import UnexpectedAPIResponse
from .model import Rendition
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
@contextlib.contextmanager
def _schema_guard(*context):
try:
yield
except (KeyError, IndexError, ValueError) as e:
raise UnexpectedAPIResponse("SCHEMA", *context) from e
def _fetch_api_object(http_session, path, object_type):
def _fetch_api_object(http_session, url, object_type):
# Fetch an API object.
url = "https://api.arte.tv/api/player/v2/" + path
r = http_session.get(url)
r.raise_for_status()
if (_ := r.headers["content-type"]) != MIME_TYPE:
raise UnexpectedAPIResponse("MIME_TYPE", path, MIME_TYPE, _)
mime_type = r.headers["content-type"]
if mime_type != MIME_TYPE:
raise UnexpectedAPIResponse("MIME_TYPE", url, MIME_TYPE, mime_type)
obj = r.json()
with _schema_guard(path):
try:
data_type = obj["data"]["type"]
data_attributes = obj["data"]["attributes"]
if data_type != object_type:
raise UnexpectedAPIResponse("OBJECT_TYPE", url, object_type, data_type)
if data_type != object_type:
raise UnexpectedAPIResponse("OBJECT_TYPE", path, object_type, _)
return obj["data"]["attributes"]
return data_attributes
except (KeyError, IndexError, ValueError) as e:
raise UnexpectedAPIResponse("SCHEMA", url) from e
def fetch_program_info(http_session, p_meta):
"""Fetch the given program metadata and indexes."""
obj = _fetch_api_object(
http_session, f"config/{p_meta.site}/{p_meta.id}", "ConfigPlayer"
)
def iter_renditions(program_id, player_config_url, http_session):
"""Iterate over renditions for the given program."""
obj = _fetch_api_object(http_session, player_config_url, "ConfigPlayer")
with _schema_guard(p_meta.site, p_meta.id):
codes = set()
try:
provider_id = obj["metadata"]["providerId"]
streams = [(s["protocol"], s["url"]) for s in obj["streams"]]
if provider_id != p_meta.id:
raise UnexpectedAPIResponse(
"PROGRAM_ID_MISMATCH",
p_meta.site,
p_meta.id,
provider_id,
)
program_index_urls = set()
for protocol, program_index_url in streams:
if protocol != "HLS_NG":
raise UnsupportedHLSProtocol(p_meta.site, p_meta.id, protocol)
if program_index_url in program_index_urls:
if provider_id != program_id:
raise UnexpectedAPIResponse(
"DUPLICATE_PROGRAM_INDEX_URL",
p_meta.site,
p_meta.id,
program_index_url,
"PROVIDER_ID_MISMATCH", player_config_url, provider_id
)
program_index_urls.add(program_index_url)
for s in obj["streams"]:
code = s["versions"][0]["eStat"]["ml5"]
return program_index_urls
if code in codes:
raise UnexpectedAPIResponse(
"DUPLICATE_RENDITION_CODE", player_config_url, code
)
codes.add(code)
yield (
Rendition(
s["versions"][0]["eStat"]["ml5"],
s["versions"][0]["label"],
),
s["protocol"],
s["url"],
)
except (KeyError, IndexError, ValueError) as e:
raise UnexpectedAPIResponse("SCHEMA", player_config_url) from e
if not codes:
raise UnexpectedAPIResponse("NO_RENDITIONS", player_config_url)

52
src/delarte/download.py Normal file
View File

@ -0,0 +1,52 @@
# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""Provide download utilities."""
import os
from . import subtitles
_CHUNK = 64 * 1024
def download_mp4_media(url, file_name, http_session, on_progress):
"""Download a MP4 (video or audio) to given file."""
on_progress(file_name, 0, 0)
if os.path.isfile(file_name):
on_progress(file_name, 1, 1)
return
temp_file = f"{file_name}.tmp"
with open(temp_file, "w+b") as f:
r = http_session.get(url, timeout=5, stream=True)
r.raise_for_status()
total = int(r.headers["content-length"])
for content in r.iter_content(_CHUNK):
f.write(content)
on_progress(file_name, f.tell(), total)
os.rename(temp_file, file_name)
def download_vtt_media(url, file_name, http_session, on_progress):
"""Download a VTT and SRT-convert it to to given file."""
on_progress(file_name, 0, 0)
if os.path.isfile(file_name):
on_progress(file_name, 1, 1)
return
temp_file = f"{file_name}.tmp"
with open(temp_file, "w", encoding="utf-8") as f:
r = http_session.get(url, timeout=5)
r.raise_for_status()
r.encoding = "utf-8"
subtitles.convert(r.text, f)
on_progress(file_name, f.tell(), f.tell())
os.rename(temp_file, file_name)

View File

@ -40,12 +40,15 @@ class InvalidPage(UnexpectedError):
#
# Others
# api
#
class UnexpectedAPIResponse(UnexpectedError):
"""Unexpected response from ArteTV."""
#
# hls
#
class UnexpectedHLSResponse(UnexpectedError):
"""Unexpected response from ArteTV."""
@ -54,5 +57,8 @@ class UnsupportedHLSProtocol(ModuleError):
"""Program type not supported."""
#
# subtitles
#
class WebVTTError(UnexpectedError):
"""Unexpected WebVTT data."""

View File

@ -4,23 +4,10 @@
"""Provide HLS protocol utilities."""
import contextlib
import os
from tempfile import NamedTemporaryFile
import m3u8
from . import subtitles
from .error import UnexpectedHLSResponse
from .model import (
AudioMeta,
AudioTrack,
SubtitlesMeta,
SubtitlesTrack,
Target,
VideoMeta,
VideoTrack,
)
from .error import UnexpectedHLSResponse, UnsupportedHLSProtocol
from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
#
# WARNING !
@ -40,7 +27,7 @@ from .model import (
MIME_TYPE = "application/x-mpegURL"
def _fetch_index(http_session, url):
def _fetch_index(url, http_session):
# Fetch a M3U8 playlist
r = http_session.get(url)
r.raise_for_status()
@ -53,9 +40,12 @@ def _fetch_index(http_session, url):
return m3u8.loads(r.text, url)
def fetch_program_tracks(http_session, program_index_url):
"""Fetch video, audio and subtitles tracks for the given program index."""
program_index = _fetch_index(http_session, program_index_url)
def iter_variants(protocol, program_index_url, http_session):
"""Iterate over variants for the given rendition."""
if protocol != "HLS_NG":
raise UnsupportedHLSProtocol(protocol, program_index_url)
program_index = _fetch_index(program_index_url, http_session)
audio_media = None
subtitles_media = None
@ -78,8 +68,9 @@ def fetch_program_tracks(http_session, program_index_url):
if not audio_media:
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
audio_track = AudioTrack(
AudioMeta(
audio = (
AudioTrack(
audio_media.name,
audio_media.language,
audio_media.name.startswith("VO"),
(
@ -90,9 +81,10 @@ def fetch_program_tracks(http_session, program_index_url):
audio_media.absolute_uri,
)
subtitles_track = (
SubtitlesTrack(
SubtitlesMeta(
subtitles = (
(
SubtitlesTrack(
subtitles_media.name,
subtitles_media.language,
(
subtitles_media.characteristics is not None
@ -105,7 +97,7 @@ def fetch_program_tracks(http_session, program_index_url):
else None
)
video_tracks = set()
codes = set()
for video_media in program_index.playlists:
stream_info = video_media.stream_info
@ -117,33 +109,39 @@ def fetch_program_tracks(http_session, program_index_url):
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
elif stream_info.subtitles:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
video_track = VideoTrack(
VideoMeta(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
code = f"{stream_info.resolution[1]}p"
if code in codes:
raise UnexpectedHLSResponse(
"DUPLICATE_STREAM_CODE", program_index_url, code
)
codes.add(code)
yield (
Variant(
code,
stream_info.average_bandwidth,
),
video_media.absolute_uri,
(
VideoTrack(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
),
video_media.absolute_uri,
),
audio,
subtitles,
)
if video_track in video_tracks:
raise UnexpectedHLSResponse(
"DUPLICATE_VIDEO_TRACK", program_index_url, video_track
)
video_tracks.add(video_track)
return video_tracks, audio_track, subtitles_track
if not codes:
raise UnexpectedHLSResponse("NO_VARIANTS", program_index_url)
def _convert_byterange(obj):
@ -154,18 +152,16 @@ def _convert_byterange(obj):
return offset, offset + count - 1
def _fetch_av_index(http_session, track_index_url):
# Fetch an audio or video track index.
# Return a tuple:
# - the media file url
# - the media file's ranges
track_index = _fetch_index(http_session, track_index_url)
def fetch_mp4_media(track_index_url, http_session):
"""Fetch an audio or video media."""
track_index = _fetch_index(track_index_url, http_session)
file_name = track_index.segment_map[0].uri
start, end = _convert_byterange(track_index.segment_map[0])
if start != 0:
raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url)
ranges = [(start, end)]
# ranges = [(start, end)]
next_start = end + 1
for segment in track_index.segments:
@ -178,16 +174,15 @@ def _fetch_av_index(http_session, track_index_url):
"DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url
)
ranges.append((start, end))
# ranges.append((start, end))
next_start = end + 1
return track_index.segment_map[0].absolute_uri, ranges
return track_index.segment_map[0].absolute_uri
def _fetch_s_index(http_session, track_index_url):
# Fetch subtitles index.
# Return the subtitle file url.
track_index = _fetch_index(http_session, track_index_url)
def fetch_vtt_media(track_index_url, http_session):
"""Fetch an audio or video media."""
track_index = _fetch_index(track_index_url, http_session)
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
@ -197,112 +192,3 @@ def _fetch_s_index(http_session, track_index_url):
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
return urls[0]
def _download_av_track(http_session, track_index_url, progress):
# Download an audio or video data to temporary file.
# Return the temporary file path.
url, ranges = _fetch_av_index(http_session, track_index_url)
total = ranges[-1][1]
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f
):
for range_start, range_end in ranges:
r = http_session.get(
url,
headers={
"Range": f"bytes={range_start}-{range_end}",
},
timeout=5,
)
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedHLSResponse(
"UNEXPECTED_AV_TRACK_HTTP_STATUS",
track_index_url,
r.request.headers,
r.status,
)
if len(r.content) != range_end - range_start + 1:
raise UnexpectedHLSResponse(
"INVALID_AV_TRACK_FRAGMENT_LENGTH", track_index_url
)
f.write(r.content)
progress(range_end, total)
return f.name
def _download_s_track(http_session, track_index_url, progress):
# Download a subtitle file (converted from VTT to SRT format) into a temporary file.
# Return the temporary file path.
url = _fetch_s_index(http_session, track_index_url)
progress(0, 2)
r = http_session.get(url)
r.raise_for_status()
r.encoding = "utf-8"
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
subtitles.convert(r.text, f)
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_target_tracks(http_session, target, progress):
"""Download target tracks to temporary files.
Returns a context manager that will delete the temporary files on exit.
The context expression is a local version of the given target.
"""
v_path, (v_meta, v_url) = None, target.video_track
a_path, (a_meta, a_url) = None, target.audio_track
s_path, (s_meta, s_url) = None, target.subtitles_track or (None, None)
try:
s_path = (
_download_s_track(
http_session,
s_url,
lambda i, n: progress("subtitles", i, n),
)
if s_meta
else None
)
a_path = _download_av_track(
http_session, a_url, lambda i, n: progress("audio", i, n)
)
v_path = _download_av_track(
http_session, v_url, lambda i, n: progress("video", i, n)
)
yield Target(
target.program,
VideoTrack(v_meta, v_path),
AudioTrack(a_meta, a_path),
SubtitlesTrack(s_meta, s_path) if s_meta else None,
target.file_name,
)
finally:
if v_path and os.path.isfile(v_path):
os.unlink(v_path)
if a_path and os.path.isfile(a_path):
os.unlink(a_path)
if s_path and os.path.isfile(s_path):
os.unlink(s_path)

View File

@ -7,106 +7,131 @@
from typing import NamedTuple, Optional
class ProgramMeta(NamedTuple):
#
# Metadata objects
#
class Program(NamedTuple):
"""A program metadata."""
site: str
"""The site where it is hosted (fr, de, etc...)."""
id: str
"""The ID."""
language: str
title: str
"""The title."""
subtitle: str
"""The subtitle or secondary title."""
class VideoMeta(NamedTuple):
"""A video track metadata."""
class Rendition(NamedTuple):
"""A program rendition metadata."""
width: int
"""Horizontal part of the resolution."""
height: int
"""Vertical part of the resolution."""
frame_rate: float
"""Frame rate per seconds."""
code: str
label: str
class SubtitlesMeta(NamedTuple):
"""A subtitles track metadata."""
class Variant(NamedTuple):
"""A program variant metadata."""
language: str
"""ISO 639-1 two-letter language codes."""
is_descriptive: bool
"""Whether provides a textual description (closed captions)."""
class AudioMeta(NamedTuple):
"""A audio track metadata."""
language: str
"""ISO 639-1 two-letter language codes, or "mul" for multiple languages."""
is_original: bool
"""Whether audio track is original (no audio description or dubbing)."""
is_descriptive: bool
"""Whether provides an audio description."""
code: str
average_bandwidth: int
#
# Track objects
#
class VideoTrack(NamedTuple):
"""A video track."""
meta: VideoMeta
url: str
width: int
height: int
frame_rate: float
class AudioTrack(NamedTuple):
"""An audio track."""
name: str
language: str
original: bool
visual_impaired: bool
class SubtitlesTrack(NamedTuple):
"""A subtitles track."""
meta: SubtitlesMeta
url: str
name: str
language: str
hearing_impaired: bool
class AudioTrack(NamedTuple):
"""A audio track."""
#
# Source objects
#
class ProgramSource(NamedTuple):
"""A program source item."""
meta: AudioMeta
url: str
program: Program
player_config_url: str
class Variant(NamedTuple):
"""A program variant."""
class RenditionSource(NamedTuple):
"""A rendition source item."""
key: VideoMeta
source: str
program: Program
rendition: Rendition
protocol: str
program_index_url: Program
class Rendition(NamedTuple):
"""A program rendition."""
class VariantSource(NamedTuple):
"""A variant source item."""
key: tuple[AudioMeta, Optional[SubtitlesMeta]]
source: tuple[str, Optional[str]]
class VideoMedia(NamedTuple):
"""A video media."""
track: VideoTrack
track_index_url: str
class Sources(NamedTuple):
"""A program's sources."""
class AudioMedia(NamedTuple):
"""An audio media."""
program: ProgramMeta
variants: list[Variant]
renditions: list[Rendition]
track: AudioTrack
track_index_url: str
class SubtitlesMedia(NamedTuple):
"""A subtitles media."""
track: SubtitlesTrack
track_index_url: str
program: Program
rendition: Rendition
variant: Variant
video_media: VideoMedia
audio_media: AudioMedia
subtitles_media: Optional[SubtitlesMedia]
class Target(NamedTuple):
"""A download target."""
"""A download target item."""
program: ProgramMeta
video_track: VideoTrack
audio_track: AudioTrack
subtitles_track: Optional[SubtitlesTrack]
file_name: str
class VideoInput(NamedTuple):
"""A video input."""
track: VideoTrack
url: str
class AudioInput(NamedTuple):
"""An audio input."""
track: AudioTrack
url: str
class SubtitlesInput(NamedTuple):
"""A subtitles input."""
track: SubtitlesTrack
url: str
video_input: VideoInput
audio_input: AudioInput
subtitles_input: Optional[SubtitlesInput]
title: str | tuple[str, str]
output: str

View File

@ -1,33 +1,74 @@
# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""Provide tracks muxing utilities."""
"""Provide target muxing utilities."""
import subprocess
def mux_target(target, _progress):
"""Multiplexes tracks into a single file."""
"""Multiplexes target into a single file."""
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", target.video_track.url])
cmd.extend(["-i", target.audio_track.url])
if target.subtitles_track:
cmd.extend(["-i", target.subtitles_track.url])
# inputs
cmd.extend(["-i", target.video_input.url])
cmd.extend(["-i", target.audio_input.url])
if target.subtitles_input:
cmd.extend(["-i", target.subtitles_input.url])
# codecs
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if target.subtitles_track:
if target.subtitles_input:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={target.audio_track.meta.language}"])
if target.subtitles_track:
# stream metadata & disposition
# cmd.extend(["-metadata:s:v:0", f"name={target.video.name!r}"])
# cmd.extend(["-metadata:s:v:0", f"language={target.video.language!r}"])
cmd.extend(["-metadata:s:a:0", f"name={target.audio_input.track.name}"])
cmd.extend(["-metadata:s:a:0", f"language={target.audio_input.track.language}"])
a_disposition = "default"
if target.audio_input.track.original:
a_disposition += "+original"
else:
a_disposition += "-original"
if target.audio_input.track.visual_impaired:
a_disposition += "+visual_impaired"
else:
a_disposition += "-visual_impaired"
cmd.extend(["-disposition:a:0", a_disposition])
if target.subtitles_input:
cmd.extend(["-metadata:s:s:0", f"name={target.subtitles_input.track.name}"])
cmd.extend(
["-metadata:s:s:0", f"language={target.subtitles_track.meta.language}"]
["-metadata:s:s:0", f"language={target.subtitles_input.track.language}"]
)
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{target.file_name}.mkv")
s_disposition = "default"
if target.subtitles_input.track.hearing_impaired:
s_disposition += "+hearing_impaired+descriptions"
else:
s_disposition += "-hearing_impaired-descriptions"
cmd.extend(["-disposition:s:0", s_disposition])
# file metadata
if isinstance(target.title, tuple):
cmd.extend(["-metadata", f"title={target.title[0]}"])
cmd.extend(["-metadata", f"subtitle={target.title[1]}"])
else:
cmd.extend(["-metadata", f"title={target.title}"])
# output
cmd.append(f"{target.output}.mkv")
print(cmd)
subprocess.run(cmd)

View File

@ -3,23 +3,18 @@
"""Provide contextualized based file naming utility."""
import re
from typing import Optional
from .model import AudioMeta, SubtitlesMeta, VideoMeta
def file_name_builder(
v_meta: VideoMeta,
a_meta: AudioMeta,
s_meta: Optional[SubtitlesMeta],
*,
use_id=False,
sep=" - ",
seq_pfx=" - ",
seq_no_pad=False,
add_resolution=False,
add_rendition=False,
add_variant=False
):
"""Create a file namer from context."""
"""Create a file namer."""
def sub_sequence_counter(match):
index = match[1]
@ -31,17 +26,20 @@ def file_name_builder(
def replace_sequence_counter(s: str) -> str:
return re.sub(r"\s+\((\d+)/(\d+)\)", sub_sequence_counter, s)
def build_file_name(p_meta) -> str:
"""Create a file name for given program."""
def build_file_name(program, rendition, variant):
"""Create a file name."""
if use_id:
return p_meta.id
return program.id
fields = [replace_sequence_counter(p_meta.title)]
if p_meta.subtitle:
fields.append(replace_sequence_counter(p_meta.subtitle))
fields = [replace_sequence_counter(program.title)]
if program.subtitle:
fields.append(replace_sequence_counter(program.subtitle))
if add_resolution:
fields.append(f"{v_meta.height}p")
if add_rendition:
fields.append(rendition.code)
if add_variant:
fields.append(variant.code)
name = sep.join(fields)
name = re.sub(r'[/:<>"\\|?*]', "", name)

View File

@ -3,84 +3,79 @@
"""Provide ArteTV website utilities."""
import contextlib
import json
from .error import InvalidPage, PageNotFound, PageNotSupported
from .model import ProgramMeta
from .model import Program
_DATA_MARK = '<script id="__NEXT_DATA__" type="application/json">'
@contextlib.contextmanager
def _schema_guard(*context):
try:
yield
except (KeyError, IndexError, ValueError) as e:
raise InvalidPage("SCHEMA", *context) from e
def _process_programs_page(page_value):
with _schema_guard():
site = page_value["language"]
language = page_value["language"]
content_zones = [
zone
for zone in page_value["zones"]
if zone["code"].startswith("program_content_")
]
for zone in page_value["zones"]:
if not zone["code"].startswith("program_content_"):
continue
programs = [
ProgramMeta(
site, data_item["programId"], data_item["title"], data_item["subtitle"]
for data_item in zone["content"]["data"]:
if (_ := data_item["type"]) != "program":
raise InvalidPage("PROGRAMS_INVALID_CONTENT_DATA_ITEM", _)
yield (
Program(
data_item["programId"],
language,
data_item["title"],
data_item["subtitle"],
),
data_item["player"]["config"],
)
for zone in content_zones
for data_item in zone["content"]["data"]
if data_item["type"] == "program"
]
if len(content_zones) != 1:
break
else:
raise InvalidPage("PROGRAMS_PROGRAMS_COUNT")
break
else:
raise InvalidPage("PROGRAMS_CONTENT_ZONES_COUNT")
if len(programs) != 1:
raise InvalidPage("PROGRAMS_PROGRAMS_COUNT")
return programs[0]
def fetch_program(http_session, url):
"""Load the ArteTV page at given URL and return list of programs on it."""
r = http_session.get(url)
def iter_programs(page_url, http_session):
"""Iterate over programs listed on given ArteTV page."""
r = http_session.get(page_url)
# special handling of 404
if r.status_code == 404:
raise PageNotFound(url)
# other network errors
raise PageNotFound(page_url)
r.raise_for_status()
# no HTML parsing required, whe just find the mark
html = r.text
start = html.find(_DATA_MARK)
if start < 0:
raise InvalidPage("DATA_MARK_NOT_FOUND", url)
raise InvalidPage("DATA_MARK_NOT_FOUND", page_url)
start += len(_DATA_MARK)
end = html.index("</script>", start)
try:
next_js_data = json.loads(html[start:end].strip())
except json.JSONDecodeError:
raise InvalidPage("INVALID_JSON_DATA", url)
raise InvalidPage("INVALID_JSON_DATA", page_url)
with _schema_guard(url):
try:
initial_page_value = next_js_data["props"]["pageProps"]["initialPage"]["value"]
initial_type = next_js_data["props"]["pageProps"]["initialType"]
try:
match initial_type:
case "programs":
return _process_programs_page(initial_page_value)
yield from _process_programs_page(initial_page_value)
case _:
raise PageNotSupported("TYPE_NOT_SUPPORTED", url, initial_type)
raise PageNotSupported(page_url, initial_type)
except (KeyError, IndexError, ValueError) as e:
raise InvalidPage("SCHEMA", page_url) from e
except InvalidPage as e:
raise InvalidPage(e.args[0], url, *e.args[1:])
raise InvalidPage(e.args[0], page_url) from e