Refactor hls module and Variant* model

Split `hls` functionalities in smaller parts
- fetch M3U8 object `fetch_index()`
- read th indexes `read_*_index_object()`
Move that "pipeline" in `__init__::load_variant_sources()`

Remove `code` field `Variant` and replace it with a video quality
descriptor (resolution and frame rate).
This commit is contained in:
Barbagus 2023-02-20 07:10:54 +01:00
parent 58b0ba30a3
commit c8aab4c5a3
5 changed files with 211 additions and 174 deletions

View File

@ -115,45 +115,75 @@ def load_rendition_sources(http, program_source):
]
def fetch_variant_sources(renditions_sources, http):
"""Fetch variants for given renditions."""
from itertools import groupby
def _check_variant_renditions(rendition, audio, subtitles):
# Check wether the audio/subtitles rendition we figured
# from ArteTV rendition code do match the one found in the
# program index.
# Update track languages (in particular, will get rid of "und")
updated_audio = rendition.audio._replace(language=audio.language)
if updated_audio != audio:
raise UnexpectedError("AUDIO_RENDITION_MISMATCH")
from .hls import iter_variants
updated_subtitles = (
rendition.subtitles._replace(language=subtitles.language)
if rendition.subtitles
else None
)
if updated_subtitles != subtitles:
raise UnexpectedError("SUBTITLES_RENDITION_MISMATCH")
sources = [
return rendition._replace(audio=audio, subtitles=subtitles)
def load_variant_sources(http, rendition_source):
"""Load variant sources for a given rendition."""
from .hls import fetch_index, read_ng_program_index
program, rendition, program_index_url = rendition_source
if rendition.protocol != "HLS_NG":
raise UnsupportedHLSProtocol()
program_index = fetch_index(http, program_index_url)
(
variants,
(audio, audio_index_url),
(subtitles, subtitles_index_url),
) = read_ng_program_index(program_index)
if not variants:
raise UnexpectedError("NO_VARIANTS")
rendition = _check_variant_renditions(rendition, audio, subtitles)
return [
VariantSource(
program,
rendition,
variant,
VariantSource.VideoMedia(*video),
VariantSource.AudioMedia(*audio),
VariantSource.SubtitlesMedia(*subtitles) if subtitles else None,
)
for program, rendition, program_index_url in renditions_sources
for variant, video, audio, subtitles in iter_variants(
rendition.protocol, program_index_url, http
video_track_index,
audio_index_url,
subtitles_index_url,
)
for variant, video_track_index in variants
]
descriptors = list(
{(s.variant.code, s.video_media.track.frame_rate) for s in sources}
)
descriptors.sort()
for code, group in groupby(descriptors, key=lambda t: t[0]):
frame_rates_for_code = [t[1] for t in group]
if len(frame_rates_for_code) != 1:
raise UnexpectedError(
"MULTIPLE_RENDITION_FRAME_RATES", code, frame_rates_for_code
)
def _load_mp4_input(http, track_index_url):
from .hls import fetch_index, read_mp4_index
return sources
track_index = fetch_index(http, track_index_url)
return read_mp4_index(track_index)[0]
def fetch_targets(variant_sources, http, **naming_options):
def _load_vtt_input(http, track_index_url):
from .hls import fetch_index, read_vtt_index
track_index = fetch_index(http, track_index_url)
return read_vtt_index(track_index)
def fetch_targets(variant_sources: list[VariantSource], http, **naming_options):
"""Compile download targets for given variants."""
from .hls import fetch_mp4_media, fetch_vtt_media
from .naming import file_name_builder
build_file_name = file_name_builder(**naming_options)
@ -161,25 +191,38 @@ def fetch_targets(variant_sources, http, **naming_options):
targets = [
Target(
Target.VideoInput(
video_media.track,
fetch_mp4_media(video_media.track_index_url, http),
VideoTrack(
variant.video.resolution[0],
variant.video.resolution[1],
variant.video.frame_rate,
),
_load_mp4_input(http, video_index_url),
),
Target.AudioInput(
audio_media.track,
fetch_mp4_media(audio_media.track_index_url, http),
AudioTrack(
f"Audio[{rendition.audio.language}]",
rendition.audio.language,
rendition.audio.original,
rendition.audio.visual_impaired,
),
_load_mp4_input(http, audio_index_url),
),
(
Target.SubtitlesInput(
subtitles_media.track,
fetch_vtt_media(subtitles_media.track_index_url, http),
SubtitlesTrack(
f"Audio[{rendition.subtitles.language}]",
rendition.subtitles.language,
rendition.subtitles.hearing_impaired,
),
_load_vtt_input(http, subtitles_index_url),
)
if subtitles_media
if rendition.subtitles and subtitles_index_url
else None
),
(program.title, program.subtitle) if program.subtitle else program.title,
build_file_name(program, rendition, variant),
)
for program, rendition, variant, video_media, audio_media, subtitles_media in variant_sources
for program, rendition, variant, video_index_url, audio_index_url, subtitles_index_url in variant_sources
]
return targets

View File

@ -38,15 +38,15 @@ import docopt
import urllib3
from . import (
HTTPError,
ModuleError,
UnexpectedError,
HTTPError,
__version__,
download_targets,
fetch_targets,
load_program_sources,
load_rendition_sources,
fetch_targets,
fetch_variant_sources,
load_variant_sources,
)
@ -146,17 +146,35 @@ def _select_rendition_sources(rendition_code, rendition_sources):
raise Abort()
def _make_variant_code(variant):
return f"{variant.video.resolution[1]}p"
def _select_variant_sources(variant_code, variant_sources):
variant_sources_with_code = [
(variant_source, _make_variant_code(variant_source.variant))
for variant_source in variant_sources
]
if variant_code:
filtered = [s for s in variant_sources if s.variant.code == variant_code]
filtered = [s for s, c in variant_sources_with_code if c == variant_code]
if filtered:
return filtered
print(f"{variant_code!r} is not a valid variant code. Available values are:")
else:
print("Available variants:")
variant_sources.sort(key=lambda s: s.video_media.track.height, reverse=True)
for code, _ in itertools.groupby(variant_sources, key=lambda s: s.variant.code):
descriptors = list(
{(c, s.variant.video.frame_rate) for s, c in variant_sources_with_code}
)
descriptors.sort()
for code, group in itertools.groupby(descriptors, key=lambda t: t[0]):
if len([t[1] for t in group]) != 1:
raise UnexpectedError("MULTIPLE_VARIANT_FRAME_RATES")
descriptors.sort(key=lambda t: int(t[0][:-1]), reverse=True)
for code, _ in descriptors:
print(f"{code:>12}")
raise Abort()
@ -182,7 +200,11 @@ def main():
variant_sources = _select_variant_sources(
args["VARIANT"],
fetch_variant_sources(rendition_sources, http),
[
variant_source
for rendition_source in rendition_sources
for variant_source in load_variant_sources(http, rendition_source)
],
)
targets = fetch_targets(

View File

@ -3,11 +3,10 @@
"""Provide HLS protocol utilities."""
import m3u8
from .error import UnexpectedHLSResponse, UnsupportedHLSProtocol, HTTPError
from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
from .error import HTTPError, UnexpectedHLSResponse
from .model import Rendition, Variant
#
# WARNING !
@ -27,8 +26,8 @@ from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
MIME_TYPE = "application/x-mpegURL"
def _fetch_index(http, url):
# Fetch a M3U8 playlist
def fetch_index(http, url):
"""Fetch a M3U8 playlist."""
r = http.request("GET", url)
HTTPError.raise_for_status(r)
@ -38,108 +37,89 @@ def _fetch_index(http, url):
return m3u8.loads(r.data.decode("utf-8"), url)
def iter_variants(protocol, program_index_url, http):
"""Iterate over variants for the given rendition."""
if protocol != "HLS_NG":
raise UnsupportedHLSProtocol(protocol, program_index_url)
program_index = _fetch_index(http, program_index_url)
audio_media = None
subtitles_media = None
for media in program_index.media:
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedHLSResponse(
"MULTIPLE_AUDIO_MEDIA", program_index_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedHLSResponse(
"MULTIPLE_SUBTITLES_MEDIA", program_index_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
audio = (
AudioTrack(
audio_media.name,
audio_media.language,
audio_media.name.startswith("VO"),
(
audio_media.characteristics is not None
and ("public.accessibility" in audio_media.characteristics)
),
),
audio_media.absolute_uri,
)
subtitles = (
def _select_program_audio(program_index):
all = [
(
SubtitlesTrack(
subtitles_media.name,
subtitles_media.language,
media.group_id,
Rendition.Audio(
media.language,
media.name.startswith("VO"),
(
subtitles_media.characteristics is not None
and ("public.accessibility" in subtitles_media.characteristics)
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
subtitles_media.absolute_uri,
media.absolute_uri,
)
if subtitles_media
else None
)
for media in program_index.media
if media.type == "AUDIO"
]
codes = set()
if not all:
raise UnexpectedHLSResponse("NO_PROGRAM_AUDIO_MEDIA")
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_AUDIO_MEDIA")
return all[0]
def _select_program_subtitles(program_index):
all = [
(
media.group_id,
Rendition.Subtitles(
media.language,
(
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
media.absolute_uri,
)
for media in program_index.media
if media.type == "SUBTITLES"
]
if not all:
return None, None, None
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_SUBTITLES_MEDIA")
return all[0]
def read_ng_program_index(program_index):
"""Return variants, audio and subtitles from program index."""
audio_id, audio, audio_url = _select_program_audio(program_index)
subtitles_id, subtitles, subtitles_url = _select_program_subtitles(program_index)
variants = []
for video_media in program_index.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio
)
if stream_info.audio != audio_id:
raise UnexpectedHLSResponse("PROGRAM_AUDIO_MEDIA_ID")
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
elif stream_info.subtitles:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
if stream_info.subtitles != subtitles_id:
raise UnexpectedHLSResponse("PROGRAM_SUBTITLES_MEDIA_ID")
code = f"{stream_info.resolution[1]}p"
if code in codes:
raise UnexpectedHLSResponse(
"DUPLICATE_STREAM_CODE", program_index_url, code
)
codes.add(code)
yield (
Variant(
code,
stream_info.average_bandwidth,
),
variants.append(
(
VideoTrack(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
Variant(
stream_info.average_bandwidth,
Variant.Video(
stream_info.resolution,
stream_info.frame_rate,
),
),
video_media.absolute_uri,
),
audio,
subtitles,
)
)
if not codes:
raise UnexpectedHLSResponse("NO_VARIANTS", program_index_url)
return variants, (audio, audio_url), (subtitles, subtitles_url)
def _convert_byterange(obj):
@ -150,43 +130,39 @@ def _convert_byterange(obj):
return offset, offset + count - 1
def fetch_mp4_media(track_index_url, http):
"""Fetch an audio or video media."""
track_index = _fetch_index(http, track_index_url)
def read_mp4_index(track_index):
"""Return MP4 file url and ranges."""
file_name = track_index.segment_map[0].uri
start, end = _convert_byterange(track_index.segment_map[0])
if start != 0:
raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url)
ranges = [(start, end)]
if start != 0:
raise UnexpectedHLSResponse("MP4_INDEX_FRAGMENT_START")
# ranges = [(start, end)]
next_start = end + 1
for segment in track_index.segments:
if segment.uri != file_name:
raise UnexpectedHLSResponse("MULTIPLE_AV_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("MULTIPLE_MP4_INDEX_FILES")
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedHLSResponse(
"DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url
)
raise UnexpectedHLSResponse("DISCONTINUOUS_MP4_INDEX_FRAGMENT")
# ranges.append((start, end))
ranges.append((start, end))
next_start = end + 1
return track_index.segment_map[0].absolute_uri
return track_index.segment_map[0].absolute_uri, ranges
def fetch_vtt_media(track_index_url, http):
"""Fetch an audio or video media."""
track_index = _fetch_index(http, track_index_url)
def read_vtt_index(track_index):
"""Return VTT file url."""
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("NO_VTT_INDEX_FILES")
if len(urls) > 1:
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("MULTIPLE_VTT_INDEX_FILES")
return urls[0]

View File

@ -44,8 +44,14 @@ class Rendition(NamedTuple):
class Variant(NamedTuple):
"""A program variant metadata."""
code: str
class Video(NamedTuple):
"""A video rendition."""
resolution: tuple[int, int]
frame_rate: float
average_bandwidth: int
video: Video
#
@ -95,32 +101,14 @@ class RenditionSource(NamedTuple):
class VariantSource(NamedTuple):
"""A variant source item."""
class VideoMedia(NamedTuple):
"""A video media."""
track: VideoTrack
track_index_url: str
class AudioMedia(NamedTuple):
"""An audio media."""
track: AudioTrack
track_index_url: str
class SubtitlesMedia(NamedTuple):
"""A subtitles media."""
track: SubtitlesTrack
track_index_url: str
"""A variant source."""
program: Program
rendition: Rendition
variant: Variant
video_media: VideoMedia
audio_media: AudioMedia
subtitles_media: Optional[SubtitlesMedia]
video_index_url: str
audio_index_url: str
subtitles_index_url: Optional[str]
class Target(NamedTuple):

View File

@ -5,6 +5,14 @@
import re
def _make_rendition_code(rendition):
raise NotImplementedError
def _make_variant_code(variant):
raise NotImplementedError
def file_name_builder(
*,
use_id=False,
@ -36,10 +44,10 @@ def file_name_builder(
fields.append(replace_sequence_counter(program.subtitle))
if add_rendition:
fields.append(rendition.code)
fields.append(_make_rendition_code(rendition))
if add_variant:
fields.append(variant.code)
fields.append(_make_variant_code(variant))
name = sep.join(fields)
name = re.sub(r'[/:<>"\\|?*]', "", name)