Refactor code to use the model types

- Rename variables and function to reflect model names.
- Convert infrastructure data (JSON, M3U8) to model types.
- Change algorithms to produce/consume `Source` model, in particular
  using generator functions to build a list of `Source`s rather than the
  opaque `rendition => variant => urls` mapping (this will make #7 very
  straight forward).

This was an extensive rewrite of the code base.
This commit is contained in:
Barbagus 2022-12-27 08:21:30 +01:00
parent 4fa5e1953e
commit ca0a792617
6 changed files with 302 additions and 201 deletions

View File

@ -17,14 +17,93 @@ import requests
from . import api, cli, common, hls, muxing, naming, www
def _print_available_renditions(config):
for code, label in api.iter_renditions(config):
print(f"\t{code} - {label}")
class Abort(common.Error):
"""Aborted."""
def _print_available_variants(version_index):
for code, label in hls.iter_variants(version_index):
print(f"\t{code} - {label}")
class Fail(common.UnexpectedError):
"""Unexpected error."""
def _lookup_language_name(code):
# TODO: actually implement this
return f"[{code}]"
def _build_rendition_label(rendition):
# Build a human readable description for a given rendition
label = ""
if rendition.audio.is_original:
label += "original "
elif not rendition.audio.is_accessible:
label += "dubbed "
label += _lookup_language_name(rendition.audio.lang)
if rendition.audio.is_accessible:
label += " audio description"
if rendition.subtitles:
if rendition.subtitles.is_accessible:
if rendition.subtitles.lang != rendition.audio.lang:
label += f" with {_lookup_language_name(rendition.subtitles.lang)} text description"
else:
label += " with text description"
elif rendition.subtitles.lang != rendition.audio.lang:
label += (
f" with {_lookup_language_name(rendition.subtitles.lang)} subtitles"
)
return label
def _print_renditions(sources):
items = [
(r.code, _build_rendition_label(r)) for r in set(s.rendition for s in sources)
]
items.sort(key=lambda t: t[1])
for code, label in items:
print(f"\t{code:>6} - {label}")
def _print_variants(sources):
items = [(v.code, v.height) for v in set(s.variant for s in sources)]
items.sort(key=lambda t: t[1], reverse=True)
for code, _ in items:
print(f"\t{code}")
def _select_rendition(sources, rendition_code):
filtered = [s for s in sources if s.rendition.code == rendition_code]
if not filtered:
print(f"{rendition_code!r} is not a valid version, possible values are:")
_print_renditions(sources)
raise Abort()
if len(_ := set(s.rendition for s in filtered)) > 1:
raise Fail("DUPLICATE_RENDITION_CODE", _)
return filtered
def _select_variant(sources, variant_code):
filtered = [s for s in sources if s.variant.code == variant_code]
if not filtered:
print(f"{variant_code!r} is not a valid resolution, possible values are:")
_print_variants(sources)
raise Abort()
if len(_ := set(s.variant for s in filtered)) > 1:
raise Fail("DUPLICATE_VARIANT_CODE", _)
return filtered
def create_progress():
@ -63,44 +142,44 @@ def main():
print(__doc__)
return 0
http_session = requests.sessions.Session()
try:
www_lang, program_id = www.parse_url(args.pop(0))
config = api.load_config(http_session, www_lang, program_id)
target_id, www_lang = www.parse_url(args.pop(0))
http_session = requests.sessions.Session()
sources = [
source
for metadata, master_playlist_url in api.fetch_program_info(
http_session, target_id, www_lang
)
for source in hls.fetch_program_sources(
http_session, metadata, master_playlist_url
)
]
if not args:
print(f"Available versions:")
_print_available_renditions(config)
_print_renditions(sources)
return 0
rendition_code = args.pop(0)
master_playlist_url = api.select_rendition(config, rendition_code)
if master_playlist_url is None:
print(f"{rendition_code!r} is not a valid version, accepted values are:")
_print_available_renditions(config)
return 1
master_playlist = hls.load_master_playlist(http_session, master_playlist_url)
sources = _select_rendition(sources, args.pop(0))
if not args:
print(f"Available resolutions:")
_print_available_variants(master_playlist)
_print_variants(sources)
return 0
variant_code = args.pop(0)
remote_inputs = hls.select_variant(master_playlist, variant_code)
if remote_inputs is None:
print(f"{variant_code!r} is not a valid resolution, accepted values are:")
_print_available_variants(master_playlist)
return 0
sources = _select_variant(sources, args.pop(0))
file_base_name = naming.build_file_base_name(config)
file_names = [
naming.build_file_name(s, i, len(sources)) for i, s in enumerate(sources, 1)
]
progress = create_progress()
with hls.download_inputs(http_session, remote_inputs, progress) as temp_inputs:
muxing.mux(temp_inputs, file_base_name, progress)
for source, file_name in zip(sources, file_names):
with hls.download_source(http_session, source, progress) as local_source:
muxing.mux_source(local_source, file_name, progress)
except common.UnexpectedError as e:
print(str(e))

View File

@ -3,7 +3,7 @@
"""Provide ArteTV JSON API utilities."""
from . import common
from . import common, model
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
@ -16,6 +16,10 @@ class NotFound(common.Error):
"""Program not found on ArteTV."""
class UnsupportedProtocol(common.Error):
"""Program type not supported."""
def _fetch_api_data(http_session, path, object_type):
# Fetch an API object.
url = "https://api.arte.tv/api/player/v2/" + path
@ -34,34 +38,37 @@ def _fetch_api_data(http_session, path, object_type):
if (_ := obj["type"]) != object_type:
raise UnexpectedResponse("OBJECT_TYPE", path, object_type, _)
return obj
return obj["attributes"]
def load_config(http_session, lang, program_id):
"""Retrieve a program config from API."""
url = f"config/{lang}/{program_id}"
config = _fetch_api_data(
http_session, f"config/{lang}/{program_id}", "ConfigPlayer"
def fetch_program_info(http_session, target_id, www_lang):
"""Fetch the given target's associated program information."""
obj = _fetch_api_data(
http_session, f"config/{www_lang}/{target_id}", "ConfigPlayer"
)
return config
metadata = model.Metadata(
obj["metadata"]["providerId"],
obj["metadata"]["title"],
obj["metadata"]["subtitle"],
obj["metadata"]["description"],
obj["metadata"]["duration"]["seconds"],
)
cache = set()
def iter_renditions(config):
"""Return a rendition (code, label) iterator."""
for stream in config["attributes"]["streams"]:
yield (
# rendition code
stream["versions"][0]["eStat"]["ml5"],
# rendition full name
stream["versions"][0]["label"],
)
for s in obj["streams"]:
if (_ := s["protocol"]) != "HLS_NG":
raise UnsupportedProtocol(target_id, www_lang, _)
if (master_playlist_url := s["url"]) in cache:
raise UnexpectedResponse(
"DUPLICATE_MASTER_PLAYLIST_URL",
target_id,
www_lang,
master_playlist_url,
)
def select_rendition(config, rendition_code):
"""Return the master playlist index url for the given rendition code."""
for stream in config["attributes"]["streams"]:
if stream["versions"][0]["eStat"]["ml5"] == rendition_code:
return stream["url"]
cache.add(master_playlist_url)
return None
yield (metadata, master_playlist_url)

View File

@ -66,7 +66,7 @@ from tempfile import NamedTemporaryFile
import m3u8
import webvtt
from . import common
from . import common, model
#
# WARNING !
@ -79,7 +79,8 @@ from . import common
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video media playlists segments are incremental ranges of the same file
# - Audio and video media playlists segments are incremental ranges of
# the same file
# - Subtitles media playlists have only one segment
@ -87,11 +88,6 @@ class UnexpectedResponse(common.UnexpectedError):
"""Unexpected response from ArteTV."""
def _make_resolution_code(variant):
# resolution code (1080p, 720p, ...)
return f"{variant.stream_info.resolution[1]}p"
def _fetch_playlist(http_session, url):
# Fetch a M3U8 playlist
r = http_session.get(url)
@ -99,120 +95,145 @@ def _fetch_playlist(http_session, url):
return m3u8.loads(r.text, url)
def load_master_playlist(http_session, url):
"""Download and return a master playlist."""
master_playlist = _fetch_playlist(http_session, url)
def fetch_program_sources(http_session, metadata, master_playlist_url):
"""Fetch the given master playlist and yield available sources."""
master_playlist = _fetch_playlist(http_session, master_playlist_url)
if not master_playlist.playlists:
raise UnexpectedResponse("NO_PLAYLISTS", url)
audio_media = None
subtitles_media = None
resolution_codes = set()
for variant in master_playlist.playlists:
resolution_code = _make_resolution_code(variant)
if resolution_code in resolution_codes:
raise UnexpectedResponse("DUPLICATE_RESOLUTION_CODE", url, resolution_code)
resolution_codes.add(resolution_code)
audio_media = False
subtitles_media = False
for m in variant.media:
if m.type == "AUDIO":
for media in master_playlist.media:
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedResponse("MULTIPLE_AUDIO_MEDIA", url)
audio_media = True
elif m.type == "SUBTITLES":
raise UnexpectedResponse(
"MULTIPLE_AUDIO_MEDIA", master_playlist_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedResponse("MULTIPLE_SUBTITLES_MEDIA", url)
subtitles_media = True
raise UnexpectedResponse(
"MULTIPLE_SUBTITLES_MEDIA", master_playlist_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", url)
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", master_playlist_url)
return master_playlist
rendition = model.Rendition(
model.RenditionAudio(
audio_media.language,
audio_media.name.startswith("VO"),
audio_media.characteristics
and ("public.accessibility" in audio_media.characteristics),
),
model.RenditionSubtitles(
subtitles_media.language,
subtitles_media.characteristics
and ("public.accessibility" in subtitles_media.characteristics),
)
if subtitles_media
else None,
)
cache = set()
def iter_variants(master_playlist):
"""Iterate over variants."""
for variant in sorted(
master_playlist.playlists,
key=lambda v: v.stream_info.resolution[1],
reverse=True,
):
yield (
_make_resolution_code(variant),
f"{variant.stream_info.resolution[0]} x {variant.stream_info.resolution[1]}",
for video_media in master_playlist.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedResponse(
"INVALID_VARIANT_AUDIO_MEDIA", master_playlist_url, stream_info.audio
)
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
elif stream_info.subtitles:
raise UnexpectedResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
variant = model.Variant(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
)
if variant in cache:
raise UnexpectedResponse("DUPLICATE_VARIANT", master_playlist_url, variant)
cache.add(variant)
yield model.Source(
metadata,
rendition,
variant,
video_media.absolute_uri,
audio_media.absolute_uri,
subtitles_media.absolute_uri if subtitles_media else None,
)
def select_variant(master_playlist, resolution_code):
"""Return the stream information for a given resolution code."""
for variant in master_playlist.playlists:
code = _make_resolution_code(variant)
if code != resolution_code:
continue
audio_track = None
for m in variant.media:
if m.type == "AUDIO":
audio_track = (m.language, variant.base_uri + m.uri)
break
subtitles_track = None
for m in variant.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, variant.base_uri + m.uri)
break
return (
variant.base_uri + variant.uri,
audio_track,
subtitles_track,
)
return None
def _parse_byterange(obj):
# Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)
def _convert_byterange(obj):
# Convert a M3U8 `byterange` (1) to an `http range` (2).
# 1. "count@offset"
# 2. (start, end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def _load_av_segments(http_session, media_playlist_url):
media_playlist = _fetch_playlist(http_session, media_playlist_url)
def _fetch_av_media_playlist(http_session, url):
# Fetch an audio or video media playlist.
# Return a tuple:
# - the media file url
# - the media file's ranges
media_playlist = _fetch_playlist(http_session, url)
file_name = media_playlist.segment_map[0].uri
range_start, range_end = _parse_byterange(media_playlist.segment_map[0])
if range_start != 0:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_START", media_playlist_url
)
chunks = [(range_start, range_end)]
total = range_end + 1
start, end = _convert_byterange(media_playlist.segment_map[0])
if start != 0:
raise UnexpectedResponse("INVALID_AV_MEDIA_FRAGMENT_START", url)
ranges = [(start, end)]
next_start = end + 1
for segment in media_playlist.segments:
if segment.uri != file_name:
raise UnexpectedResponse("MULTIPLE_STREAM_MEDIA_FILES", media_playlist_url)
raise UnexpectedResponse("MULTIPLE_AV_MEDIA_FILES", url)
range_start, range_end = _parse_byterange(segment)
if range_start != total:
raise UnexpectedResponse(
"DISCONTINUOUS_STREAM_MEDIA_FRAGMENT", media_playlist_url
)
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url)
chunks.append((range_start, range_end))
total = range_end + 1
ranges.append((start, end))
next_start = end + 1
return media_playlist.segment_map[0].absolute_uri, chunks
return media_playlist.segment_map[0].absolute_uri, ranges
def _download_av_stream(http_session, media_playlist_url, progress):
# Download an audio or video stream to temporary directory
url, ranges = _load_av_segments(http_session, media_playlist_url)
def _fetch_subtitles_media_playlist(http_session, url):
# Fetch subtitles media playlist.
# Return the subtitle file url.
subtitles_index = _fetch_playlist(http_session, url)
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url)
return urls[0]
def _download_av_media(http_session, media_playlist_url, progress):
# Download an audio or video stream to temporary file.
# Return the temporary file name.
url, ranges = _fetch_av_media_playlist(http_session, media_playlist_url)
total = ranges[-1][1]
with (
@ -226,13 +247,14 @@ def _download_av_stream(http_session, media_playlist_url, progress):
headers={
"Range": f"bytes={range_start}-{range_end}",
},
timeout=5,
)
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedResponse(
"STREAM_MEDIA_HTTP_STATUS",
"UNEXPECTED_AV_MEDIA_HTTP_STATUS",
media_playlist_url,
r.request.headers,
r.status,
@ -240,7 +262,7 @@ def _download_av_stream(http_session, media_playlist_url, progress):
if len(r.content) != range_end - range_start + 1:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_LENGTH", media_playlist_url
"INVALID_AV_MEDIA_FRAGMENT_LENGTH", media_playlist_url
)
f.write(r.content)
@ -249,19 +271,13 @@ def _download_av_stream(http_session, media_playlist_url, progress):
return f.name
def _download_subtitles_input(http_session, index_url, progress):
# Return a temporary file name where VTT subtitle has been downloaded/converted to SRT
subtitles_index = _fetch_playlist(http_session, index_url)
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", index_url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", index_url)
def _download_subtitles_media(http_session, media_playlist_url, progress):
# Download a subtitle file (converted from VTT to SRT format) into a temporary file.
# Return the temporary file name.
url = _fetch_subtitles_media_playlist(http_session, media_playlist_url)
progress(0, 2)
r = http_session.get(urls[0])
r = http_session.get(url)
r.raise_for_status()
buffer = io.StringIO(r.text)
@ -286,45 +302,50 @@ def _download_subtitles_input(http_session, index_url, progress):
@contextlib.contextmanager
def download_inputs(http_session, remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
def download_source(http_session, source, progress):
"""Download source inputs into temporary files.
Returns a context manager that will delete the temporary files on exit.
The context expression is a local version of the given source.
"""
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = _download_av_stream(
http_session, video_index_url, lambda i, n: progress("video", i, n)
video_filename = _download_av_media(
http_session, source.video, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = _download_av_stream(
http_session, audio_index_url, lambda i, n: progress("audio", i, n)
audio_filename = _download_av_media(
http_session, source.audio, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = _download_subtitles_input(
subtitles_filename = (
_download_subtitles_media(
http_session,
subtitles_index_url,
source.subtitles,
lambda i, n: progress("subtitles", i, n),
)
if source.subtitles
else None
)
yield model.Source(
source.metadata,
source.rendition,
source.variant,
video_filename,
audio_filename,
subtitles_filename,
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)

View File

@ -6,30 +6,24 @@
import subprocess
def mux(inputs, file_base_name, progress):
def mux_source(source, file_base_name, _progress):
"""Build FFMPEG args."""
video_input, audio_track, subtitles_track = inputs
audio_lang, audio_input = audio_track
if subtitles_track:
subtitles_lang, subtitles_input = subtitles_track
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", video_input])
cmd.extend(["-i", audio_input])
if subtitles_track:
cmd.extend(["-i", subtitles_input])
cmd.extend(["-i", source.video])
cmd.extend(["-i", source.audio])
if source.subtitles:
cmd.extend(["-i", source.subtitles])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if subtitles_track:
if source.subtitles:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
cmd.extend(["-metadata:s:a:0", f"language={source.rendition.audio.lang}"])
if subtitles_track:
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
if source.rendition.subtitles:
cmd.extend(["-metadata:s:s:0", f"language={source.rendition.subtitles.lang}"])
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")

View File

@ -4,6 +4,6 @@
"""Provide contextualized based file naming utility."""
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def build_file_name(source, _index, _total):
"""Create a base file name from a source."""
return source.metadata.title.replace("/", "-")

View File

@ -30,4 +30,4 @@ def parse_url(url):
target_id = path.pop(0)
return www_lang, target_id
return target_id, www_lang