Refactor code to use the model types

- Rename variables and function to reflect model names.
- Convert infrastructure data (JSON, M3U8) to model types.
- Change algorithms to produce/consume `Source` model, in particular
  using generator functions to build a list of `Source`s rather than the
  opaque `rendition => variant => urls` mapping (this will make #7 very
  straight forward).
- Download all master playlists after API call before selecting
  rendition/variants.

Motivation for the last point:

We use to offer rendition choosing right after the API call, before we
download the appropriate master playlist to figure out the available
variants.

The problem with that is that ArteTV's codes for the renditions (given
by the API) do not necessarily include complete languages information
(if it is not French or German), for instance a original audio track in
Portuguese would show as `VOEU-` (as in "EUropean"). The actual mention
of the Portuguese would only show up in the master playlist.

So, the new implementation actually downloads all master playlists
straight after the API call. This is a bit wasteful, but I figured it
was necessary to provide quality interaction with the user.

Bonus? Now when we first prompt the user for rendition choice, we
actually already know the available variants available, maybe we make
use of that fact in the future...
This commit is contained in:
Barbagus 2022-12-27 08:21:30 +01:00
parent 4fa5e1953e
commit db0a954497
6 changed files with 302 additions and 201 deletions

View File

@ -17,14 +17,93 @@ import requests
from . import api, cli, common, hls, muxing, naming, www
def _print_available_renditions(config):
for code, label in api.iter_renditions(config):
print(f"\t{code} - {label}")
class Abort(common.Error):
"""Aborted."""
def _print_available_variants(version_index):
for code, label in hls.iter_variants(version_index):
print(f"\t{code} - {label}")
class Fail(common.UnexpectedError):
"""Unexpected error."""
def _lookup_language_name(code):
# TODO: actually implement this
return f"[{code}]"
def _build_rendition_label(rendition):
# Build a human readable description for a given rendition
label = ""
if rendition.audio.is_original:
label += "original "
elif not rendition.audio.is_accessible:
label += "dubbed "
label += _lookup_language_name(rendition.audio.lang)
if rendition.audio.is_accessible:
label += " audio description"
if rendition.subtitles:
if rendition.subtitles.is_accessible:
if rendition.subtitles.lang != rendition.audio.lang:
label += f" with {_lookup_language_name(rendition.subtitles.lang)} text description"
else:
label += " with text description"
elif rendition.subtitles.lang != rendition.audio.lang:
label += (
f" with {_lookup_language_name(rendition.subtitles.lang)} subtitles"
)
return label
def _print_renditions(sources):
items = [
(r.code, _build_rendition_label(r)) for r in set(s.rendition for s in sources)
]
items.sort(key=lambda t: t[1])
for code, label in items:
print(f"\t{code:>6} - {label}")
def _print_variants(sources):
items = [(v.code, v.height) for v in set(s.variant for s in sources)]
items.sort(key=lambda t: t[1], reverse=True)
for code, _ in items:
print(f"\t{code}")
def _select_rendition(sources, rendition_code):
filtered = [s for s in sources if s.rendition.code == rendition_code]
if not filtered:
print(f"{rendition_code!r} is not a valid version, possible values are:")
_print_renditions(sources)
raise Abort()
if len(_ := set(s.rendition for s in filtered)) > 1:
raise Fail("DUPLICATE_RENDITION_CODE", _)
return filtered
def _select_variant(sources, variant_code):
filtered = [s for s in sources if s.variant.code == variant_code]
if not filtered:
print(f"{variant_code!r} is not a valid resolution, possible values are:")
_print_variants(sources)
raise Abort()
if len(_ := set(s.variant for s in filtered)) > 1:
raise Fail("DUPLICATE_VARIANT_CODE", _)
return filtered
def create_progress():
@ -63,44 +142,44 @@ def main():
print(__doc__)
return 0
http_session = requests.sessions.Session()
try:
www_lang, program_id = www.parse_url(args.pop(0))
config = api.load_config(http_session, www_lang, program_id)
target_id, www_lang = www.parse_url(args.pop(0))
http_session = requests.sessions.Session()
sources = [
source
for metadata, master_playlist_url in api.fetch_program_info(
http_session, target_id, www_lang
)
for source in hls.fetch_program_sources(
http_session, metadata, master_playlist_url
)
]
if not args:
print(f"Available versions:")
_print_available_renditions(config)
_print_renditions(sources)
return 0
rendition_code = args.pop(0)
master_playlist_url = api.select_rendition(config, rendition_code)
if master_playlist_url is None:
print(f"{rendition_code!r} is not a valid version, accepted values are:")
_print_available_renditions(config)
return 1
master_playlist = hls.load_master_playlist(http_session, master_playlist_url)
sources = _select_rendition(sources, args.pop(0))
if not args:
print(f"Available resolutions:")
_print_available_variants(master_playlist)
_print_variants(sources)
return 0
variant_code = args.pop(0)
remote_inputs = hls.select_variant(master_playlist, variant_code)
if remote_inputs is None:
print(f"{variant_code!r} is not a valid resolution, accepted values are:")
_print_available_variants(master_playlist)
return 0
sources = _select_variant(sources, args.pop(0))
file_base_name = naming.build_file_base_name(config)
file_names = [
naming.build_file_name(s, i, len(sources)) for i, s in enumerate(sources, 1)
]
progress = create_progress()
with hls.download_inputs(http_session, remote_inputs, progress) as temp_inputs:
muxing.mux(temp_inputs, file_base_name, progress)
for source, file_name in zip(sources, file_names):
with hls.download_source(http_session, source, progress) as local_source:
muxing.mux_source(local_source, file_name, progress)
except common.UnexpectedError as e:
print(str(e))

View File

@ -3,7 +3,7 @@
"""Provide ArteTV JSON API utilities."""
from . import common
from . import common, model
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
@ -16,6 +16,10 @@ class NotFound(common.Error):
"""Program not found on ArteTV."""
class UnsupportedProtocol(common.Error):
"""Program type not supported."""
def _fetch_api_data(http_session, path, object_type):
# Fetch an API object.
url = "https://api.arte.tv/api/player/v2/" + path
@ -34,34 +38,37 @@ def _fetch_api_data(http_session, path, object_type):
if (_ := obj["type"]) != object_type:
raise UnexpectedResponse("OBJECT_TYPE", path, object_type, _)
return obj
return obj["attributes"]
def load_config(http_session, lang, program_id):
"""Retrieve a program config from API."""
url = f"config/{lang}/{program_id}"
config = _fetch_api_data(
http_session, f"config/{lang}/{program_id}", "ConfigPlayer"
def fetch_program_info(http_session, target_id, www_lang):
"""Fetch the given target's associated program information."""
obj = _fetch_api_data(
http_session, f"config/{www_lang}/{target_id}", "ConfigPlayer"
)
return config
metadata = model.Metadata(
obj["metadata"]["providerId"],
obj["metadata"]["title"],
obj["metadata"]["subtitle"],
obj["metadata"]["description"],
obj["metadata"]["duration"]["seconds"],
)
cache = set()
def iter_renditions(config):
"""Return a rendition (code, label) iterator."""
for stream in config["attributes"]["streams"]:
yield (
# rendition code
stream["versions"][0]["eStat"]["ml5"],
# rendition full name
stream["versions"][0]["label"],
)
for s in obj["streams"]:
if (_ := s["protocol"]) != "HLS_NG":
raise UnsupportedProtocol(target_id, www_lang, _)
if (master_playlist_url := s["url"]) in cache:
raise UnexpectedResponse(
"DUPLICATE_MASTER_PLAYLIST_URL",
target_id,
www_lang,
master_playlist_url,
)
def select_rendition(config, rendition_code):
"""Return the master playlist index url for the given rendition code."""
for stream in config["attributes"]["streams"]:
if stream["versions"][0]["eStat"]["ml5"] == rendition_code:
return stream["url"]
cache.add(master_playlist_url)
return None
yield (metadata, master_playlist_url)

View File

@ -66,7 +66,7 @@ from tempfile import NamedTemporaryFile
import m3u8
import webvtt
from . import common
from . import common, model
#
# WARNING !
@ -79,7 +79,8 @@ from . import common
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video media playlists segments are incremental ranges of the same file
# - Audio and video media playlists segments are incremental ranges of
# the same file
# - Subtitles media playlists have only one segment
@ -87,11 +88,6 @@ class UnexpectedResponse(common.UnexpectedError):
"""Unexpected response from ArteTV."""
def _make_resolution_code(variant):
# resolution code (1080p, 720p, ...)
return f"{variant.stream_info.resolution[1]}p"
def _fetch_playlist(http_session, url):
# Fetch a M3U8 playlist
r = http_session.get(url)
@ -99,120 +95,145 @@ def _fetch_playlist(http_session, url):
return m3u8.loads(r.text, url)
def load_master_playlist(http_session, url):
"""Download and return a master playlist."""
master_playlist = _fetch_playlist(http_session, url)
def fetch_program_sources(http_session, metadata, master_playlist_url):
"""Fetch the given master playlist and yield available sources."""
master_playlist = _fetch_playlist(http_session, master_playlist_url)
if not master_playlist.playlists:
raise UnexpectedResponse("NO_PLAYLISTS", url)
audio_media = None
subtitles_media = None
resolution_codes = set()
for variant in master_playlist.playlists:
resolution_code = _make_resolution_code(variant)
if resolution_code in resolution_codes:
raise UnexpectedResponse("DUPLICATE_RESOLUTION_CODE", url, resolution_code)
resolution_codes.add(resolution_code)
audio_media = False
subtitles_media = False
for m in variant.media:
if m.type == "AUDIO":
for media in master_playlist.media:
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedResponse("MULTIPLE_AUDIO_MEDIA", url)
audio_media = True
elif m.type == "SUBTITLES":
raise UnexpectedResponse(
"MULTIPLE_AUDIO_MEDIA", master_playlist_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedResponse("MULTIPLE_SUBTITLES_MEDIA", url)
subtitles_media = True
raise UnexpectedResponse(
"MULTIPLE_SUBTITLES_MEDIA", master_playlist_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", url)
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", master_playlist_url)
return master_playlist
rendition = model.Rendition(
model.RenditionAudio(
audio_media.language,
audio_media.name.startswith("VO"),
audio_media.characteristics
and ("public.accessibility" in audio_media.characteristics),
),
model.RenditionSubtitles(
subtitles_media.language,
subtitles_media.characteristics
and ("public.accessibility" in subtitles_media.characteristics),
)
if subtitles_media
else None,
)
cache = set()
def iter_variants(master_playlist):
"""Iterate over variants."""
for variant in sorted(
master_playlist.playlists,
key=lambda v: v.stream_info.resolution[1],
reverse=True,
):
yield (
_make_resolution_code(variant),
f"{variant.stream_info.resolution[0]} x {variant.stream_info.resolution[1]}",
for video_media in master_playlist.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedResponse(
"INVALID_VARIANT_AUDIO_MEDIA", master_playlist_url, stream_info.audio
)
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
elif stream_info.subtitles:
raise UnexpectedResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
variant = model.Variant(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
)
if variant in cache:
raise UnexpectedResponse("DUPLICATE_VARIANT", master_playlist_url, variant)
cache.add(variant)
yield model.Source(
metadata,
rendition,
variant,
video_media.absolute_uri,
audio_media.absolute_uri,
subtitles_media.absolute_uri if subtitles_media else None,
)
def select_variant(master_playlist, resolution_code):
"""Return the stream information for a given resolution code."""
for variant in master_playlist.playlists:
code = _make_resolution_code(variant)
if code != resolution_code:
continue
audio_track = None
for m in variant.media:
if m.type == "AUDIO":
audio_track = (m.language, variant.base_uri + m.uri)
break
subtitles_track = None
for m in variant.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, variant.base_uri + m.uri)
break
return (
variant.base_uri + variant.uri,
audio_track,
subtitles_track,
)
return None
def _parse_byterange(obj):
# Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)
def _convert_byterange(obj):
# Convert a M3U8 `byterange` (1) to an `http range` (2).
# 1. "count@offset"
# 2. (start, end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def _load_av_segments(http_session, media_playlist_url):
media_playlist = _fetch_playlist(http_session, media_playlist_url)
def _fetch_av_media_playlist(http_session, url):
# Fetch an audio or video media playlist.
# Return a tuple:
# - the media file url
# - the media file's ranges
media_playlist = _fetch_playlist(http_session, url)
file_name = media_playlist.segment_map[0].uri
range_start, range_end = _parse_byterange(media_playlist.segment_map[0])
if range_start != 0:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_START", media_playlist_url
)
chunks = [(range_start, range_end)]
total = range_end + 1
start, end = _convert_byterange(media_playlist.segment_map[0])
if start != 0:
raise UnexpectedResponse("INVALID_AV_MEDIA_FRAGMENT_START", url)
ranges = [(start, end)]
next_start = end + 1
for segment in media_playlist.segments:
if segment.uri != file_name:
raise UnexpectedResponse("MULTIPLE_STREAM_MEDIA_FILES", media_playlist_url)
raise UnexpectedResponse("MULTIPLE_AV_MEDIA_FILES", url)
range_start, range_end = _parse_byterange(segment)
if range_start != total:
raise UnexpectedResponse(
"DISCONTINUOUS_STREAM_MEDIA_FRAGMENT", media_playlist_url
)
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url)
chunks.append((range_start, range_end))
total = range_end + 1
ranges.append((start, end))
next_start = end + 1
return media_playlist.segment_map[0].absolute_uri, chunks
return media_playlist.segment_map[0].absolute_uri, ranges
def _download_av_stream(http_session, media_playlist_url, progress):
# Download an audio or video stream to temporary directory
url, ranges = _load_av_segments(http_session, media_playlist_url)
def _fetch_subtitles_media_playlist(http_session, url):
# Fetch subtitles media playlist.
# Return the subtitle file url.
subtitles_index = _fetch_playlist(http_session, url)
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url)
return urls[0]
def _download_av_media(http_session, media_playlist_url, progress):
# Download an audio or video stream to temporary file.
# Return the temporary file name.
url, ranges = _fetch_av_media_playlist(http_session, media_playlist_url)
total = ranges[-1][1]
with (
@ -226,13 +247,14 @@ def _download_av_stream(http_session, media_playlist_url, progress):
headers={
"Range": f"bytes={range_start}-{range_end}",
},
timeout=5,
)
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedResponse(
"STREAM_MEDIA_HTTP_STATUS",
"UNEXPECTED_AV_MEDIA_HTTP_STATUS",
media_playlist_url,
r.request.headers,
r.status,
@ -240,7 +262,7 @@ def _download_av_stream(http_session, media_playlist_url, progress):
if len(r.content) != range_end - range_start + 1:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_LENGTH", media_playlist_url
"INVALID_AV_MEDIA_FRAGMENT_LENGTH", media_playlist_url
)
f.write(r.content)
@ -249,19 +271,13 @@ def _download_av_stream(http_session, media_playlist_url, progress):
return f.name
def _download_subtitles_input(http_session, index_url, progress):
# Return a temporary file name where VTT subtitle has been downloaded/converted to SRT
subtitles_index = _fetch_playlist(http_session, index_url)
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", index_url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", index_url)
def _download_subtitles_media(http_session, media_playlist_url, progress):
# Download a subtitle file (converted from VTT to SRT format) into a temporary file.
# Return the temporary file name.
url = _fetch_subtitles_media_playlist(http_session, media_playlist_url)
progress(0, 2)
r = http_session.get(urls[0])
r = http_session.get(url)
r.raise_for_status()
buffer = io.StringIO(r.text)
@ -286,45 +302,50 @@ def _download_subtitles_input(http_session, index_url, progress):
@contextlib.contextmanager
def download_inputs(http_session, remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
def download_source(http_session, source, progress):
"""Download source inputs into temporary files.
Returns a context manager that will delete the temporary files on exit.
The context expression is a local version of the given source.
"""
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = _download_av_stream(
http_session, video_index_url, lambda i, n: progress("video", i, n)
video_filename = _download_av_media(
http_session, source.video, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = _download_av_stream(
http_session, audio_index_url, lambda i, n: progress("audio", i, n)
audio_filename = _download_av_media(
http_session, source.audio, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = _download_subtitles_input(
subtitles_filename = (
_download_subtitles_media(
http_session,
subtitles_index_url,
source.subtitles,
lambda i, n: progress("subtitles", i, n),
)
if source.subtitles
else None
)
yield model.Source(
source.metadata,
source.rendition,
source.variant,
video_filename,
audio_filename,
subtitles_filename,
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)

View File

@ -6,30 +6,24 @@
import subprocess
def mux(inputs, file_base_name, progress):
def mux_source(source, file_base_name, _progress):
"""Build FFMPEG args."""
video_input, audio_track, subtitles_track = inputs
audio_lang, audio_input = audio_track
if subtitles_track:
subtitles_lang, subtitles_input = subtitles_track
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", video_input])
cmd.extend(["-i", audio_input])
if subtitles_track:
cmd.extend(["-i", subtitles_input])
cmd.extend(["-i", source.video])
cmd.extend(["-i", source.audio])
if source.subtitles:
cmd.extend(["-i", source.subtitles])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if subtitles_track:
if source.subtitles:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
cmd.extend(["-metadata:s:a:0", f"language={source.rendition.audio.lang}"])
if subtitles_track:
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
if source.rendition.subtitles:
cmd.extend(["-metadata:s:s:0", f"language={source.rendition.subtitles.lang}"])
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")

View File

@ -4,6 +4,6 @@
"""Provide contextualized based file naming utility."""
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def build_file_name(source, _index, _total):
"""Create a base file name from a source."""
return source.metadata.title.replace("/", "-")

View File

@ -30,4 +30,4 @@ def parse_url(url):
target_id = path.pop(0)
return www_lang, target_id
return target_id, www_lang