Refactor models and API

Change/add/rename model's data structures in order to provide a more
useful API #20, introducing new structures:
- `Sources`: summarizing program, renditions and variants found
  at a given ArteTV page URL
- `Target`: summarizing all required data for a download

And new functions:
- `fetch_sources()` to build the `Sources` from a URL
- `iter_[renditions|variants]()` describe the available options for the
  `Sources`
- `select_[renditions|variants]()` to narrow down the desired options
  for the `Sources`
- `compile_sources` to compute such a `Target` from `Sources`
- `download_target` to download such a `Target`

Finally, this should make the playlist handling #7 easier (I know, I've
said that before)
This commit is contained in:
Barbagus 2023-01-09 19:30:46 +01:00
parent b13d4186b0
commit 4667dbfca1
7 changed files with 395 additions and 257 deletions

View File

@ -10,26 +10,152 @@ from .model import *
def fetch_sources(http_session, url):
"""Fetch a url's sources."""
"""Fetch sources at a given ArteTV page URL."""
from .api import fetch_program_info
from .hls import fetch_program_sources
from .hls import fetch_program_tracks
from .www import parse_url
site, target_id = parse_url(url)
return [
source
for metadata, program_index_url in fetch_program_info(
http_session, site, target_id
variants = dict()
renditions = dict()
program_meta, program_index_urls = fetch_program_info(http_session, site, target_id)
for program_index_url in program_index_urls:
v_tracks, a_track, s_track = fetch_program_tracks(
http_session, program_index_url
)
for source in fetch_program_sources(http_session, metadata, program_index_url)
]
for v_meta, v_url in v_tracks:
if v_meta not in variants:
variants[v_meta] = v_url
elif variants[v_meta] != v_url:
raise ValueError
a_meta, a_url = a_track
s_meta, s_url = s_track or (None, None)
if (a_meta, s_meta) not in renditions:
renditions[(a_meta, s_meta)] = (a_url, s_url)
elif renditions[(a_meta, s_meta)] != (a_url, s_url):
raise ValueError
return Sources(
program_meta,
[Variant(key, source) for key, source in variants.items()],
[Rendition(key, source) for key, source in renditions.items()],
)
def download_source(http_session, source, file_name, progress):
"""Download the given source into given file."""
from .hls import download_source
from .muxing import mux_source
def iter_renditions(sources):
"""Iterate over renditions (code, key) of the given sources."""
keys = [r.key for r in sources.renditions]
with download_source(http_session, source, progress) as local_source:
mux_source(local_source, file_name, progress)
keys.sort(
key=lambda k: (
not k[0].is_original,
k[0].language,
k[0].is_descriptive,
k[1].language if k[1] else "",
k[1].is_descriptive if k[1] else False,
)
)
for (a_meta, s_meta) in keys:
code = a_meta.language
if a_meta.is_descriptive:
code += "[AD]"
if s_meta:
if s_meta.is_descriptive:
code += f"-{s_meta.language}[CC]"
elif s_meta.language != a_meta.language:
code += f"-{s_meta.language}"
yield code, (a_meta, s_meta)
def select_rendition(sources, key):
"""Reject all other renditions from the given sources."""
renditions = [r for r in sources.renditions if r.key == key]
match len(renditions):
case 0:
raise ValueError("rendition not found")
case 1:
pass
case _:
raise ValueError("non unique rendition")
sources.renditions[:] = renditions
def iter_variants(sources):
"""Iterate over variants (code, key) of the given sources."""
import itertools
keys = [v.key for v in sources.variants]
keys.sort(key=lambda k: (k.height, k.frame_rate), reverse=True)
for height, group in itertools.groupby(keys, lambda m: m.height):
group = list(group)
if len(group) == 1:
yield f"{height}p", group[0]
else:
for m in group:
yield f"{height}p@{m.frame_rate}", m
def select_variant(sources, key):
"""Reject all other variants from the given sources."""
variants = [v for v in sources.variants if v.key == key]
match len(variants):
case 0:
raise ValueError("variant not found")
case 1:
pass
case _:
raise ValueError("non unique variant")
sources.variants[:] = variants
def compile_sources(sources, **naming_options):
"""Return target from the given sources."""
from .naming import file_name_builder
match len(sources.variants):
case 0:
raise ValueError("no variants")
case 1:
v_meta, v_url = sources.variants[0]
case _:
raise ValueError("multiple variants")
match len(sources.renditions):
case 0:
raise ValueError("no renditions")
case 1:
(a_meta, s_meta), (a_url, s_url) = sources.renditions[0]
case _:
raise ValueError("multiple renditions")
build_file_name = file_name_builder(v_meta, a_meta, s_meta, **naming_options)
return Target(
sources.program,
VideoTrack(v_meta, v_url),
AudioTrack(a_meta, a_url),
SubtitlesTrack(s_meta, s_url) if s_meta else None,
build_file_name(sources.program),
)
def download_target(http_session, target, progress):
"""Download the given target."""
from .hls import download_target_tracks
from .muxing import mux_target
with download_target_tracks(http_session, target, progress) as local_target:
mux_target(local_target, progress)

View File

@ -30,7 +30,16 @@ import time
import docopt
import requests
from . import __version__, download_source, fetch_sources, naming
from . import (
__version__,
compile_sources,
download_target,
fetch_sources,
iter_renditions,
iter_variants,
select_rendition,
select_variant,
)
from .error import ModuleError, UnexpectedError
@ -42,85 +51,74 @@ class Fail(UnexpectedError):
"""Unexpected error."""
def _lookup_language_name(code):
# TODO: actually implement this
return f"[{code}]"
_LANGUAGES = {
"de": "German",
"en": "English",
"es": "Spanish",
"fr": "French",
"it": "Italian",
"mul": "multiple language",
"no": "Norwegian",
"pt": "Portuguese",
}
def _build_rendition_label(rendition):
# Build a human readable description for a given rendition
label = ""
if rendition.audio.is_original:
label += "original "
elif not rendition.audio.provides_accessibility:
label += "dubbed "
label += _lookup_language_name(rendition.audio.lang)
if rendition.audio.provides_accessibility:
label += " audio description"
if rendition.subtitles:
if rendition.subtitles.provides_accessibility:
if rendition.subtitles.lang != rendition.audio.lang:
label += f" with {_lookup_language_name(rendition.subtitles.lang)} text description"
else:
label += " with text description"
elif rendition.subtitles.lang != rendition.audio.lang:
label += (
f" with {_lookup_language_name(rendition.subtitles.lang)} subtitles"
)
return label
def _language_name_for_code(code):
return _LANGUAGES.get(code, f"[{code}]")
def _print_renditions(sources):
items = [
(r.code, _build_rendition_label(r)) for r in set(s.rendition for s in sources)
]
def _language_name(meta):
return _language_name_for_code(meta.language)
items.sort(key=lambda t: t[1])
for code, label in items:
def _print_renditions(renditions):
has_original = False
for code, (a_meta, s_meta) in renditions:
label = _language_name(a_meta)
if a_meta.is_original:
has_original = True
label = "original " + label
elif a_meta.is_descriptive:
label += " audio description"
elif has_original:
label += " dubbed"
if s_meta:
if s_meta.is_descriptive:
label += f" ({_language_name(s_meta)} closed captions)"
elif s_meta.language != a_meta.language:
label += f" ({_language_name(s_meta)} subtitles)"
print(f"\t{code:>6} - {label}")
def _print_variants(sources):
items = [(v.code, v.height) for v in set(s.variant for s in sources)]
def _validate_rendition(renditions, code):
for code_, rendition in renditions:
if code_ == code:
break
else:
print(f"{code!r} is not a valid rendition code, possible values are:")
_print_renditions(renditions)
raise Abort()
items.sort(key=lambda t: t[1], reverse=True)
return rendition
for code, _ in items:
def _print_variants(variants):
for code, _ in variants:
print(f"\t{code}")
def _select_rendition(sources, rendition_code):
filtered = [s for s in sources if s.rendition.code == rendition_code]
if not filtered:
print(f"{rendition_code!r} is not a valid rendition code, possible values are:")
_print_renditions(sources)
def _validate_variant(variants, code):
for code_, variant in variants:
if code_ == code:
break
else:
print(f"{code!r} is not a valid variant code, possible values are:")
_print_variants(variants)
raise Abort()
if len(_ := set(s.rendition for s in filtered)) > 1:
raise Fail("DUPLICATE_RENDITION_CODE", _)
return filtered
def _select_variant(sources, variant_code):
filtered = [s for s in sources if s.variant.code == variant_code]
if not filtered:
print(f"{variant_code!r} is not a valid variant code, possible values are:")
_print_variants(sources)
raise Abort()
if len(_ := set(s.variant for s in filtered)) > 1:
raise Fail("DUPLICATE_VARIANT_CODE", _)
return filtered
return variant
def create_progress():
@ -160,28 +158,27 @@ def main():
sources = fetch_sources(http_session, args["URL"])
renditions = list(iter_renditions(sources))
if not args["RENDITION"]:
print(f"Available renditions:")
_print_renditions(sources)
_print_renditions(renditions)
return 0
sources = _select_rendition(sources, args["RENDITION"])
select_rendition(sources, _validate_rendition(renditions, args["RENDITION"]))
variants = list(iter_variants(sources))
if not args["VARIANT"]:
print(f"Available variants:")
_print_variants(sources)
_print_variants(variants)
return 0
sources = _select_variant(sources, args["VARIANT"])
select_variant(sources, _validate_variant(variants, args["VARIANT"]))
file_names = [
naming.build_file_name(s, i, len(sources)) for i, s in enumerate(sources, 1)
]
target = compile_sources(sources)
progress = create_progress()
for source, file_name in zip(sources, file_names):
download_source(http_session, source, file_name, progress)
download_target(http_session, target, progress)
except UnexpectedError as e:
print(str(e))

View File

@ -4,7 +4,7 @@
"""Provide ArteTV JSON API utilities."""
from .error import UnexpectedAPIResponse, UnsupportedHLSProtocol
from .model import Metadata
from .model import ProgramMeta
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
@ -27,32 +27,38 @@ def _fetch_api_data(http_session, path, object_type):
return obj["attributes"]
def fetch_program_info(http_session, site, target_id):
"""Fetch the given target's associated program information."""
obj = _fetch_api_data(http_session, f"config/{site}/{target_id}", "ConfigPlayer")
def fetch_program_info(http_session, site, program_id):
"""Fetch the given program metadata and indexes."""
obj = _fetch_api_data(http_session, f"config/{site}/{program_id}", "ConfigPlayer")
metadata = Metadata(
obj["metadata"]["providerId"],
if (_ := obj["metadata"]["providerId"]) != program_id:
raise UnexpectedAPIResponse(
"PROGRAM_ID_MISMATCH",
site,
program_id,
_,
)
program_meta = ProgramMeta(
obj["metadata"]["title"],
obj["metadata"]["subtitle"],
obj["metadata"]["description"],
obj["metadata"]["duration"]["seconds"],
)
cache = set()
program_index_urls = set()
for s in obj["streams"]:
if (_ := s["protocol"]) != "HLS_NG":
raise UnsupportedHLSProtocol(site, target_id, _)
raise UnsupportedHLSProtocol(site, program_id, _)
if (program_index_url := s["url"]) in cache:
if (program_index_url := s["url"]) in program_index_urls:
raise UnexpectedAPIResponse(
"DUPLICATE_PROGRAM_INDEX_URL",
site,
target_id,
program_id,
program_index_url,
)
cache.add(program_index_url)
program_index_urls.add(program_index_url)
yield (metadata, program_index_url)
return program_meta, program_index_urls

View File

@ -12,7 +12,15 @@ import m3u8
from . import subtitles
from .error import UnexpectedHLSResponse
from .model import Rendition, RenditionAudio, RenditionSubtitles, Source, Variant
from .model import (
AudioMeta,
AudioTrack,
SubtitlesMeta,
SubtitlesTrack,
VideoMeta,
VideoTrack,
Target,
)
#
# WARNING !
@ -45,8 +53,8 @@ def _fetch_index(http_session, url):
return m3u8.loads(r.text, url)
def fetch_program_sources(http_session, metadata, program_index_url):
"""Fetch the given index and yield available sources."""
def fetch_program_tracks(http_session, program_index_url):
"""Fetch video, audio and subtitles tracks for the given program index."""
program_index = _fetch_index(http_session, program_index_url)
audio_media = None
@ -70,8 +78,8 @@ def fetch_program_sources(http_session, metadata, program_index_url):
if not audio_media:
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
rendition = Rendition(
RenditionAudio(
audio_track = AudioTrack(
AudioMeta(
audio_media.language,
audio_media.name.startswith("VO"),
(
@ -79,59 +87,64 @@ def fetch_program_sources(http_session, metadata, program_index_url):
and ("public.accessibility" in audio_media.characteristics)
),
),
RenditionSubtitles(
subtitles_media.language,
(
subtitles_media.characteristics is not None
and ("public.accessibility" in subtitles_media.characteristics)
),
)
if subtitles_media
else None,
audio_media.absolute_uri,
)
cache = set()
subtitles_track = (
SubtitlesTrack(
SubtitlesMeta(
subtitles_media.language,
(
subtitles_media.characteristics is not None
and ("public.accessibility" in subtitles_media.characteristics)
),
),
subtitles_media.absolute_uri,
)
if subtitles_media
else None
)
video_tracks = set()
for video_media in program_index.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_VARIANT_AUDIO_MEDIA", program_index_url, stream_info.audio
"INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio
)
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
)
elif stream_info.subtitles:
raise UnexpectedHLSResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
)
variant = Variant(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
)
if variant in cache:
raise UnexpectedHLSResponse("DUPLICATE_VARIANT", program_index_url, variant)
cache.add(variant)
yield Source(
metadata,
rendition,
variant,
video_track = VideoTrack(
VideoMeta(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
),
video_media.absolute_uri,
audio_media.absolute_uri,
subtitles_media.absolute_uri if subtitles_media else None,
)
if video_track in video_tracks:
raise UnexpectedHLSResponse(
"DUPLICATE_VIDEO_TRACK", program_index_url, video_track
)
video_tracks.add(video_track)
return video_tracks, audio_track, subtitles_track
def _convert_byterange(obj):
# Convert a M3U8 `byterange` (1) to an `http range` (2).
@ -141,8 +154,8 @@ def _convert_byterange(obj):
return offset, offset + count - 1
def _fetch_av_track_index(http_session, track_index_url):
# Fetch an audio or video index.
def _fetch_av_index(http_session, track_index_url):
# Fetch an audio or video track index.
# Return a tuple:
# - the media file url
# - the media file's ranges
@ -171,25 +184,25 @@ def _fetch_av_track_index(http_session, track_index_url):
return track_index.segment_map[0].absolute_uri, ranges
def _fetch_subtitles_track_index(http_session, track_index_url):
def _fetch_s_index(http_session, track_index_url):
# Fetch subtitles index.
# Return the subtitle file url.
track_index = _fetch_index(http_session, track_index_url)
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
raise UnexpectedHLSResponse("SUBTITLES_INDEX_NO_FILES", track_index_url)
raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url)
if len(urls) > 1:
raise UnexpectedHLSResponse("SUBTITLES_INDEX_MULTIPLE_FILES", track_index_url)
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
return urls[0]
def _download_av_track(http_session, track_index_url, progress):
# Download an audio or video stream to temporary file.
# Return the temporary file name.
url, ranges = _fetch_av_track_index(http_session, track_index_url)
# Download an audio or video data to temporary file.
# Return the temporary file path.
url, ranges = _fetch_av_index(http_session, track_index_url)
total = ranges[-1][1]
with (
@ -227,10 +240,10 @@ def _download_av_track(http_session, track_index_url, progress):
return f.name
def _download_subtitles_track(http_session, track_index_url, progress):
def _download_s_track(http_session, track_index_url, progress):
# Download a subtitle file (converted from VTT to SRT format) into a temporary file.
# Return the temporary file name.
url = _fetch_subtitles_track_index(http_session, track_index_url)
# Return the temporary file path.
url = _fetch_s_index(http_session, track_index_url)
progress(0, 2)
r = http_session.get(url)
@ -247,50 +260,49 @@ def _download_subtitles_track(http_session, track_index_url, progress):
@contextlib.contextmanager
def download_source(http_session, source, progress):
"""Download source inputs into temporary files.
def download_target_tracks(http_session, target, progress):
"""Download target tracks to temporary files.
Returns a context manager that will delete the temporary files on exit.
The context expression is a local version of the given source.
The context expression is a local version of the given target.
"""
video_filename = None
audio_filename = None
subtitles_filename = None
v_path, (v_meta, v_url) = None, target.video_track
a_path, (a_meta, a_url) = None, target.audio_track
s_path, (s_meta, s_url) = None, target.subtitles_track or (None, None)
try:
subtitles_filename = (
_download_subtitles_track(
s_path = (
_download_s_track(
http_session,
source.subtitles,
s_url,
lambda i, n: progress("subtitles", i, n),
)
if source.subtitles
if s_meta
else None
)
video_filename = _download_av_track(
http_session, source.video, lambda i, n: progress("video", i, n)
a_path = _download_av_track(
http_session, a_url, lambda i, n: progress("audio", i, n)
)
audio_filename = _download_av_track(
http_session, source.audio, lambda i, n: progress("audio", i, n)
v_path = _download_av_track(
http_session, v_url, lambda i, n: progress("video", i, n)
)
yield Source(
source.metadata,
source.rendition,
source.variant,
video_filename,
audio_filename,
subtitles_filename,
yield Target(
target.program,
VideoTrack(v_meta, v_path),
AudioTrack(a_meta, a_path),
SubtitlesTrack(s_meta, s_path) if s_meta else None,
target.file_name,
)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if v_path and os.path.isfile(v_path):
os.unlink(v_path)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if a_path and os.path.isfile(a_path):
os.unlink(a_path)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)
if s_path and os.path.isfile(s_path):
os.unlink(s_path)

View File

@ -7,12 +7,9 @@
from typing import NamedTuple, Optional
class Metadata(NamedTuple):
class ProgramMeta(NamedTuple):
"""A program metadata."""
id: str
"""The ID string."""
title: str
"""The title."""
@ -22,98 +19,91 @@ class Metadata(NamedTuple):
description: str
"""The description."""
duration: int
"""The duration in seconds."""
class VideoMeta(NamedTuple):
"""A video track metadata."""
width: int
"""Horizontal part of the resolution."""
height: int
"""Vertical part of the resolution."""
frame_rate: float
"""Frame rate per seconds."""
class RenditionAudio(NamedTuple):
"""A rendition's audio part."""
class SubtitlesMeta(NamedTuple):
"""A subtitles track metadata."""
lang: str
language: str
"""ISO 639-1 two-letter language codes."""
is_descriptive: bool
"""Whether provides a textual description (closed captions)."""
class AudioMeta(NamedTuple):
"""A audio track metadata."""
language: str
"""ISO 639-1 two-letter language codes, or "mul" for multiple languages."""
is_original: bool
"""Whether audio track is original (no audio description or dubbing)."""
provides_accessibility: bool
is_descriptive: bool
"""Whether provides an audio description."""
class RenditionSubtitles(NamedTuple):
"""A rendition's subtitles part."""
class VideoTrack(NamedTuple):
"""A video track."""
lang: str
"""ISO 639-1 two-letter language codes."""
provides_accessibility: bool
"""Whether provides an readable description."""
meta: VideoMeta
url: str
class Rendition(NamedTuple):
"""A program's content version."""
class SubtitlesTrack(NamedTuple):
"""A subtitles track."""
audio: RenditionAudio
subtitles: Optional[RenditionSubtitles]
meta: SubtitlesMeta
url: str
@property
def code(self):
"""Return a code string representation."""
# This code string MUST be built in a way that every possible rendition
# object has its own code string.
# Also, it should be as concise as possible because it will be typed
# by the user in the command line.
code = self.audio.lang
if self.audio.provides_accessibility:
# audio accessibility (audio description)
code += "-AD"
class AudioTrack(NamedTuple):
"""A audio track."""
if self.subtitles:
if self.subtitles.provides_accessibility:
# visual accessibility (text description)
code += "-TD"
if self.subtitles.lang != self.audio.lang:
# specifies subtitles language only if different from audio language
return code + "-" + self.subtitles.lang
return code
meta: AudioMeta
url: str
class Variant(NamedTuple):
"""A program's quality version."""
"""A program variant."""
width: int
"""Horizontal part of the resolution."""
height: int
"""Vertical part of the resolution."""
frame_rate: int
"""Frame rate per seconds."""
@property
def code(self):
"""Return a code string representation."""
# This code string MUST be built in a way that every possible variant
# object has its own code string.
# Also, it should be as concise as possible because it will be typed
# by the user in the command line.
#
# So far, it seems variants differ on resolution only.
return f"{self.height}p"
key: VideoMeta
source: str
class Source(NamedTuple):
"""A program source."""
class Rendition(NamedTuple):
"""A program rendition."""
metadata: Metadata
rendition: Rendition
variant: Variant
key: tuple[AudioMeta, Optional[SubtitlesMeta]]
source: tuple[str, Optional[str]]
video: str
"""Video track locator."""
audio: str
"""Audio track locator."""
class Sources(NamedTuple):
"""A program's sources."""
subtitles: Optional[str]
"""Subtitles track locator."""
program: ProgramMeta
variants: list[Variant]
renditions: list[Rendition]
class Target(NamedTuple):
"""A download target."""
program: ProgramMeta
video_track: VideoTrack
audio_track: AudioTrack
subtitles_track: Optional[SubtitlesTrack]
file_name: str

View File

@ -6,26 +6,28 @@
import subprocess
def mux_source(source, file_base_name, _progress):
"""Build FFMPEG args."""
def mux_target(target, _progress):
"""Multiplexes tracks into a single file."""
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", source.video])
cmd.extend(["-i", source.audio])
if source.subtitles:
cmd.extend(["-i", source.subtitles])
cmd.extend(["-i", target.video_track.url])
cmd.extend(["-i", target.audio_track.url])
if target.subtitles_track:
cmd.extend(["-i", target.subtitles_track.url])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if source.subtitles:
if target.subtitles_track:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={source.rendition.audio.lang}"])
cmd.extend(["-metadata:s:a:0", f"language={target.audio_track.meta.language}"])
if source.rendition.subtitles:
cmd.extend(["-metadata:s:s:0", f"language={source.rendition.subtitles.lang}"])
if target.subtitles_track:
cmd.extend(
["-metadata:s:s:0", f"language={target.subtitles_track.meta.language}"]
)
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")
cmd.append(f"{target.file_name}.mkv")
subprocess.run(cmd)

View File

@ -4,6 +4,11 @@
"""Provide contextualized based file naming utility."""
def build_file_name(source, _index, _total):
"""Create a base file name from a source."""
return source.metadata.title.replace("/", "-")
def file_name_builder(v_meta, a_meta, s_meta, **options):
"""Create a file namer from context."""
def build_file_name(p_meta):
"""Create a file name for given program."""
return p_meta.title.replace("/", "-")
return build_file_name