Compare commits

...

4 Commits

Author SHA1 Message Date
Barbagus c8aab4c5a3 Refactor `hls` module and `Variant*` model
Split `hls` functionalities in smaller parts
- fetch M3U8 object `fetch_index()`
- read th indexes `read_*_index_object()`
Move that "pipeline" in `__init__::load_variant_sources()`

Remove `code` field `Variant` and replace it with a video quality
descriptor (resolution and frame rate).
2023-02-20 07:10:54 +01:00
Barbagus 58b0ba30a3 Refactor `api` module and `Rendition*` model
Split `api` functionalities in smaller parts
- fetch API JSON object `fetch_api_object()`
- read the config object `read_config_player_object()`
Move that "pipeline" in `__init__.py`

Remove `code` field `Rendition` and replace it with some track rendition
models that are build from parsing the `code` from ArteTV. Also move the
`protocol` from the `RenditionSource` to the `Rendition` model itself...
who knows how we might handle it in the future.
2023-02-16 20:11:45 +01:00
Barbagus 4ffc32eb61 Fix invalid doc strings 2023-02-16 20:07:08 +01:00
Barbagus bdc8b7b246 Refactor `www` module
Split functionalities in smaller parts
- fetch the html code `fetch_page_content()`
- extract JSON data from html code `extract_page_data()`
- read the program info from page data `read_page_data()`
Move that "pipeline" in `__init__.py`
2023-02-16 08:18:02 +01:00
8 changed files with 476 additions and 358 deletions

View File

@ -9,90 +9,181 @@ from .error import *
from .model import *
def fetch_program_sources(url, http):
"""Fetch program sources listed on given ArteTV page."""
from .www import iter_programs
def load_program_sources(http, page_url):
"""Load programs sources listed on given ArteTV page."""
from .www import read_page_data, fetch_page_content, extract_page_data
page_content = fetch_page_content(http, page_url)
page_data = extract_page_data(page_content)
programs = read_page_data(page_data)
if not programs:
raise UnexpectedError("NO_PROGRAMS")
return [
ProgramSource(
program,
player_config_url,
Program(id, language, title, subtitle),
f"https://api.arte.tv/api/player/v2/config/{language}/{id}",
)
for program, player_config_url in iter_programs(url, http)
for id, language, title, subtitle in programs
]
def fetch_rendition_sources(program_sources, http):
"""Fetch renditions for given programs."""
from itertools import groupby
# "und" language code officially stands for "undefined"
_AUDIO_RENDITIONS = {
"VO": Rendition.Audio("und", True, False),
"VOF": Rendition.Audio("fra", True, False),
"VOA": Rendition.Audio("deu", True, False),
"VOEU": Rendition.Audio("und", True, False),
"VF": Rendition.Audio("fra", False, False),
"VA": Rendition.Audio("deu", False, False),
"VE": Rendition.Audio("und", False, False),
"VFAUD": Rendition.Audio("fra", False, True),
"VAAUD": Rendition.Audio("deu", False, True),
}
from .api import iter_renditions
_SUBTITLES_RENDITIONS = {
"STF": Rendition.Subtitles("fra", False),
"STA": Rendition.Subtitles("deu", False),
"STMF": Rendition.Subtitles("fra", True),
"STMA": Rendition.Subtitles("deu", True),
"STE[ANG]": Rendition.Subtitles("eng", False),
"STE[ESP]": Rendition.Subtitles("esp", False),
"STE[POL]": Rendition.Subtitles("pol", False),
"STE[ITA]": Rendition.Subtitles("ita", False),
}
sources = [
def _parse_rendition_code(code):
audio_code, subtitles_code = code.split("-") if "-" in code else (code, None)
try:
audio_rendition = _AUDIO_RENDITIONS[audio_code]
except KeyError:
raise UnexpectedError("AUDIO_RENDITION_CODE")
if subtitles_code:
try:
subtitles_rendition = _SUBTITLES_RENDITIONS[subtitles_code]
except KeyError:
raise UnexpectedError("SUBTITLES_RENDITION_CODE")
else:
subtitles_rendition = None
return audio_rendition, subtitles_rendition
def load_rendition_sources(http, program_source):
"""Fetch rendition sources for a given program."""
from .api import fetch_api_object, read_config_player_object
program, config_player_url = program_source
config_player_object = fetch_api_object(http, config_player_url)
program_id, streams = read_config_player_object(config_player_object)
if program_id != program.id:
raise UnexpectedError("PROGRAM_ID_MISMATCH")
if not streams:
raise UnexpectedError("NO_RENDITIONS")
track_renditions = [_parse_rendition_code(code) for code, _, _, _ in streams]
# sometimes it happens that original audio renditions be a mix "fra" or "deu" and "und",
# so in that case, we replace the "und" accordingly
originals = {audio.language for audio, _ in track_renditions if audio.original}
if len(originals) == 2 and "und" in originals:
originals.remove("und")
original = originals.pop()
track_renditions = [
(
audio._replace(language=original) if audio.original else audio,
subtitles,
)
for audio, subtitles in track_renditions
]
return [
RenditionSource(
program,
rendition,
protocol,
Rendition(label, audio, subtitles, protocol),
program_index_url,
)
for program, player_config_url in program_sources
for rendition, protocol, program_index_url in iter_renditions(
program.id,
player_config_url,
http,
for (audio, subtitles), (_, label, protocol, program_index_url) in zip(
track_renditions, streams
)
]
descriptors = list({(s.rendition.code, s.rendition.label) for s in sources})
descriptors.sort()
for code, group in groupby(descriptors, key=lambda t: t[0]):
labels_for_code = [t[1] for t in group]
if len(labels_for_code) != 1:
raise UnexpectedError("MULTIPLE_RENDITION_LABELS", code, labels_for_code)
def _check_variant_renditions(rendition, audio, subtitles):
# Check wether the audio/subtitles rendition we figured
# from ArteTV rendition code do match the one found in the
# program index.
# Update track languages (in particular, will get rid of "und")
updated_audio = rendition.audio._replace(language=audio.language)
if updated_audio != audio:
raise UnexpectedError("AUDIO_RENDITION_MISMATCH")
return sources
updated_subtitles = (
rendition.subtitles._replace(language=subtitles.language)
if rendition.subtitles
else None
)
if updated_subtitles != subtitles:
raise UnexpectedError("SUBTITLES_RENDITION_MISMATCH")
return rendition._replace(audio=audio, subtitles=subtitles)
def fetch_variant_sources(renditions_sources, http):
"""Fetch variants for given renditions."""
from itertools import groupby
def load_variant_sources(http, rendition_source):
"""Load variant sources for a given rendition."""
from .hls import fetch_index, read_ng_program_index
from .hls import iter_variants
program, rendition, program_index_url = rendition_source
if rendition.protocol != "HLS_NG":
raise UnsupportedHLSProtocol()
sources = [
program_index = fetch_index(http, program_index_url)
(
variants,
(audio, audio_index_url),
(subtitles, subtitles_index_url),
) = read_ng_program_index(program_index)
if not variants:
raise UnexpectedError("NO_VARIANTS")
rendition = _check_variant_renditions(rendition, audio, subtitles)
return [
VariantSource(
program,
rendition,
variant,
VariantSource.VideoMedia(*video),
VariantSource.AudioMedia(*audio),
VariantSource.SubtitlesMedia(*subtitles) if subtitles else None,
)
for program, rendition, protocol, program_index_url in renditions_sources
for variant, video, audio, subtitles in iter_variants(
protocol, program_index_url, http
video_track_index,
audio_index_url,
subtitles_index_url,
)
for variant, video_track_index in variants
]
descriptors = list(
{(s.variant.code, s.video_media.track.frame_rate) for s in sources}
)
descriptors.sort()
for code, group in groupby(descriptors, key=lambda t: t[0]):
frame_rates_for_code = [t[1] for t in group]
if len(frame_rates_for_code) != 1:
raise UnexpectedError(
"MULTIPLE_RENDITION_FRAME_RATES", code, frame_rates_for_code
)
def _load_mp4_input(http, track_index_url):
from .hls import fetch_index, read_mp4_index
return sources
track_index = fetch_index(http, track_index_url)
return read_mp4_index(track_index)[0]
def fetch_targets(variant_sources, http, **naming_options):
def _load_vtt_input(http, track_index_url):
from .hls import fetch_index, read_vtt_index
track_index = fetch_index(http, track_index_url)
return read_vtt_index(track_index)
def fetch_targets(variant_sources: list[VariantSource], http, **naming_options):
"""Compile download targets for given variants."""
from .hls import fetch_mp4_media, fetch_vtt_media
from .naming import file_name_builder
build_file_name = file_name_builder(**naming_options)
@ -100,25 +191,38 @@ def fetch_targets(variant_sources, http, **naming_options):
targets = [
Target(
Target.VideoInput(
video_media.track,
fetch_mp4_media(video_media.track_index_url, http),
VideoTrack(
variant.video.resolution[0],
variant.video.resolution[1],
variant.video.frame_rate,
),
_load_mp4_input(http, video_index_url),
),
Target.AudioInput(
audio_media.track,
fetch_mp4_media(audio_media.track_index_url, http),
AudioTrack(
f"Audio[{rendition.audio.language}]",
rendition.audio.language,
rendition.audio.original,
rendition.audio.visual_impaired,
),
_load_mp4_input(http, audio_index_url),
),
(
Target.SubtitlesInput(
subtitles_media.track,
fetch_vtt_media(subtitles_media.track_index_url, http),
SubtitlesTrack(
f"Audio[{rendition.subtitles.language}]",
rendition.subtitles.language,
rendition.subtitles.hearing_impaired,
),
_load_vtt_input(http, subtitles_index_url),
)
if subtitles_media
if rendition.subtitles and subtitles_index_url
else None
),
(program.title, program.subtitle) if program.subtitle else program.title,
build_file_name(program, rendition, variant),
)
for program, rendition, variant, video_media, audio_media, subtitles_media in variant_sources
for program, rendition, variant, video_index_url, audio_index_url, subtitles_index_url in variant_sources
]
return targets

View File

@ -38,15 +38,15 @@ import docopt
import urllib3
from . import (
HTTPError,
ModuleError,
UnexpectedError,
HTTPError,
__version__,
download_targets,
fetch_program_sources,
fetch_rendition_sources,
fetch_targets,
fetch_variant_sources,
load_program_sources,
load_rendition_sources,
load_variant_sources,
)
@ -97,9 +97,33 @@ def _create_progress():
return on_progress
def _make_rendition_code(rendition):
if rendition.audio.original:
code = "og:"
elif rendition.audio.visual_impaired:
code = "ad:"
else:
code = ""
code += rendition.audio.language
if rendition.subtitles:
if rendition.subtitles.hearing_impaired:
code += "-cc"
elif rendition.subtitles.language != rendition.audio.language:
code += "-st:" + rendition.subtitles.language
return code
def _select_rendition_sources(rendition_code, rendition_sources):
rendition_sources_with_code = [
(rendition_source, _make_rendition_code(rendition_source.rendition))
for rendition_source in rendition_sources
]
if rendition_code:
filtered = [s for s in rendition_sources if s.rendition.code == rendition_code]
filtered = [s for s, c in rendition_sources_with_code if c == rendition_code]
if filtered:
return filtered
print(
@ -108,26 +132,49 @@ def _select_rendition_sources(rendition_code, rendition_sources):
else:
print("Available renditions:")
key = lambda s: (s.rendition.label, s.rendition.code)
descriptors = list({(c, s.rendition.label) for s, c in rendition_sources_with_code})
rendition_sources.sort(key=key)
for (label, code), _ in itertools.groupby(rendition_sources, key=key):
print(f"{code:>12} : {label}")
descriptors.sort()
for code, group in itertools.groupby(descriptors, key=lambda t: t[0]):
if len([t[1] for t in group]) != 1:
raise UnexpectedError("MULTIPLE_RENDITION_LABELS")
descriptors.sort(key=lambda t: t[1])
for code, label in descriptors:
print(f"{code:>13} : {label}")
raise Abort()
def _make_variant_code(variant):
return f"{variant.video.resolution[1]}p"
def _select_variant_sources(variant_code, variant_sources):
variant_sources_with_code = [
(variant_source, _make_variant_code(variant_source.variant))
for variant_source in variant_sources
]
if variant_code:
filtered = [s for s in variant_sources if s.variant.code == variant_code]
filtered = [s for s, c in variant_sources_with_code if c == variant_code]
if filtered:
return filtered
print(f"{variant_code!r} is not a valid variant code. Available values are:")
else:
print("Available variants:")
variant_sources.sort(key=lambda s: s.video_media.track.height, reverse=True)
for code, _ in itertools.groupby(variant_sources, key=lambda s: s.variant.code):
descriptors = list(
{(c, s.variant.video.frame_rate) for s, c in variant_sources_with_code}
)
descriptors.sort()
for code, group in itertools.groupby(descriptors, key=lambda t: t[0]):
if len([t[1] for t in group]) != 1:
raise UnexpectedError("MULTIPLE_VARIANT_FRAME_RATES")
descriptors.sort(key=lambda t: int(t[0][:-1]), reverse=True)
for code, _ in descriptors:
print(f"{code:>12}")
raise Abort()
@ -140,16 +187,24 @@ def main():
http = urllib3.PoolManager(timeout=5)
try:
program_sources = fetch_program_sources(args["URL"], http)
program_sources = load_program_sources(http, args["URL"])
rendition_sources = _select_rendition_sources(
args["RENDITION"],
fetch_rendition_sources(program_sources, http),
[
rendition_source
for program_source in program_sources
for rendition_source in load_rendition_sources(http, program_source)
],
)
variant_sources = _select_variant_sources(
args["VARIANT"],
fetch_variant_sources(rendition_sources, http),
[
variant_source
for rendition_source in rendition_sources
for variant_source in load_variant_sources(http, rendition_source)
],
)
targets = fetch_targets(

View File

@ -5,15 +5,13 @@
import json
from .error import UnexpectedAPIResponse, HTTPError
from .model import Rendition
from .error import HTTPError, UnexpectedAPIResponse
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
def _fetch_api_object(http, url, object_type):
# Fetch an API object.
def fetch_api_object(http, url):
"""Fetch an API object."""
r = http.request("GET", url)
HTTPError.raise_for_status(r)
@ -21,51 +19,30 @@ def _fetch_api_object(http, url, object_type):
if mime_type != MIME_TYPE:
raise UnexpectedAPIResponse("MIME_TYPE", url, MIME_TYPE, mime_type)
obj = json.loads(r.data.decode("utf-8"))
return json.loads(r.data.decode("utf-8"))
def read_config_player_object(obj):
"""Return program ID and streams information from config player object."""
try:
data_type = obj["data"]["type"]
if data_type != object_type:
raise UnexpectedAPIResponse("OBJECT_TYPE", url, object_type, data_type)
if obj["data"]["type"] != "ConfigPlayer":
raise UnexpectedAPIResponse("OBJECT_TYPE")
return obj["data"]["attributes"]
attributes = obj["data"]["attributes"]
except (KeyError, IndexError, ValueError) as e:
raise UnexpectedAPIResponse("SCHEMA", url) from e
program_id = attributes["metadata"]["providerId"]
def iter_renditions(program_id, player_config_url, http):
"""Iterate over renditions for the given program."""
obj = _fetch_api_object(http, player_config_url, "ConfigPlayer")
codes = set()
try:
provider_id = obj["metadata"]["providerId"]
if provider_id != program_id:
raise UnexpectedAPIResponse(
"PROVIDER_ID_MISMATCH", player_config_url, provider_id
)
for s in obj["streams"]:
code = s["versions"][0]["eStat"]["ml5"]
if code in codes:
raise UnexpectedAPIResponse(
"DUPLICATE_RENDITION_CODE", player_config_url, code
)
codes.add(code)
yield (
Rendition(
s["versions"][0]["eStat"]["ml5"],
s["versions"][0]["label"],
),
streams = [
(
s["versions"][0]["eStat"]["ml5"],
s["versions"][0]["label"],
s["protocol"],
s["url"],
)
for s in attributes["streams"]
]
return program_id, streams
except (KeyError, IndexError, ValueError) as e:
raise UnexpectedAPIResponse("SCHEMA", player_config_url) from e
if not codes:
raise UnexpectedAPIResponse("NO_RENDITIONS", player_config_url)
raise UnexpectedAPIResponse("SCHEMA") from e

View File

@ -48,6 +48,10 @@ class InvalidPage(UnexpectedError):
"""Invalid ArteTV page."""
class InvalidPageData(UnexpectedError):
"""Invalid ArteTV page data."""
#
# api
#

View File

@ -3,11 +3,10 @@
"""Provide HLS protocol utilities."""
import m3u8
from .error import UnexpectedHLSResponse, UnsupportedHLSProtocol, HTTPError
from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
from .error import HTTPError, UnexpectedHLSResponse
from .model import Rendition, Variant
#
# WARNING !
@ -27,8 +26,8 @@ from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
MIME_TYPE = "application/x-mpegURL"
def _fetch_index(http, url):
# Fetch a M3U8 playlist
def fetch_index(http, url):
"""Fetch a M3U8 playlist."""
r = http.request("GET", url)
HTTPError.raise_for_status(r)
@ -38,108 +37,89 @@ def _fetch_index(http, url):
return m3u8.loads(r.data.decode("utf-8"), url)
def iter_variants(protocol, program_index_url, http):
"""Iterate over variants for the given rendition."""
if protocol != "HLS_NG":
raise UnsupportedHLSProtocol(protocol, program_index_url)
program_index = _fetch_index(http, program_index_url)
audio_media = None
subtitles_media = None
for media in program_index.media:
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedHLSResponse(
"MULTIPLE_AUDIO_MEDIA", program_index_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedHLSResponse(
"MULTIPLE_SUBTITLES_MEDIA", program_index_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
audio = (
AudioTrack(
audio_media.name,
audio_media.language,
audio_media.name.startswith("VO"),
(
audio_media.characteristics is not None
and ("public.accessibility" in audio_media.characteristics)
),
),
audio_media.absolute_uri,
)
subtitles = (
def _select_program_audio(program_index):
all = [
(
SubtitlesTrack(
subtitles_media.name,
subtitles_media.language,
media.group_id,
Rendition.Audio(
media.language,
media.name.startswith("VO"),
(
subtitles_media.characteristics is not None
and ("public.accessibility" in subtitles_media.characteristics)
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
subtitles_media.absolute_uri,
media.absolute_uri,
)
if subtitles_media
else None
)
for media in program_index.media
if media.type == "AUDIO"
]
codes = set()
if not all:
raise UnexpectedHLSResponse("NO_PROGRAM_AUDIO_MEDIA")
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_AUDIO_MEDIA")
return all[0]
def _select_program_subtitles(program_index):
all = [
(
media.group_id,
Rendition.Subtitles(
media.language,
(
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
media.absolute_uri,
)
for media in program_index.media
if media.type == "SUBTITLES"
]
if not all:
return None, None, None
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_SUBTITLES_MEDIA")
return all[0]
def read_ng_program_index(program_index):
"""Return variants, audio and subtitles from program index."""
audio_id, audio, audio_url = _select_program_audio(program_index)
subtitles_id, subtitles, subtitles_url = _select_program_subtitles(program_index)
variants = []
for video_media in program_index.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio
)
if stream_info.audio != audio_id:
raise UnexpectedHLSResponse("PROGRAM_AUDIO_MEDIA_ID")
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
elif stream_info.subtitles:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
)
if stream_info.subtitles != subtitles_id:
raise UnexpectedHLSResponse("PROGRAM_SUBTITLES_MEDIA_ID")
code = f"{stream_info.resolution[1]}p"
if code in codes:
raise UnexpectedHLSResponse(
"DUPLICATE_STREAM_CODE", program_index_url, code
)
codes.add(code)
yield (
Variant(
code,
stream_info.average_bandwidth,
),
variants.append(
(
VideoTrack(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
Variant(
stream_info.average_bandwidth,
Variant.Video(
stream_info.resolution,
stream_info.frame_rate,
),
),
video_media.absolute_uri,
),
audio,
subtitles,
)
)
if not codes:
raise UnexpectedHLSResponse("NO_VARIANTS", program_index_url)
return variants, (audio, audio_url), (subtitles, subtitles_url)
def _convert_byterange(obj):
@ -150,43 +130,39 @@ def _convert_byterange(obj):
return offset, offset + count - 1
def fetch_mp4_media(track_index_url, http):
"""Fetch an audio or video media."""
track_index = _fetch_index(http, track_index_url)
def read_mp4_index(track_index):
"""Return MP4 file url and ranges."""
file_name = track_index.segment_map[0].uri
start, end = _convert_byterange(track_index.segment_map[0])
if start != 0:
raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url)
ranges = [(start, end)]
if start != 0:
raise UnexpectedHLSResponse("MP4_INDEX_FRAGMENT_START")
# ranges = [(start, end)]
next_start = end + 1
for segment in track_index.segments:
if segment.uri != file_name:
raise UnexpectedHLSResponse("MULTIPLE_AV_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("MULTIPLE_MP4_INDEX_FILES")
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedHLSResponse(
"DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url
)
raise UnexpectedHLSResponse("DISCONTINUOUS_MP4_INDEX_FRAGMENT")
# ranges.append((start, end))
ranges.append((start, end))
next_start = end + 1
return track_index.segment_map[0].absolute_uri
return track_index.segment_map[0].absolute_uri, ranges
def fetch_vtt_media(track_index_url, http):
"""Fetch an audio or video media."""
track_index = _fetch_index(http, track_index_url)
def read_vtt_index(track_index):
"""Return VTT file url."""
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("NO_VTT_INDEX_FILES")
if len(urls) > 1:
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
raise UnexpectedHLSResponse("MULTIPLE_VTT_INDEX_FILES")
return urls[0]

View File

@ -22,15 +22,36 @@ class Program(NamedTuple):
class Rendition(NamedTuple):
"""A program rendition metadata."""
code: str
class Audio(NamedTuple):
"""An audio rendition."""
language: str
original: bool
visual_impaired: bool
class Subtitles(NamedTuple):
"""A subtitles rendition."""
language: str
hearing_impaired: bool
label: str
audio: Audio
subtitles: Optional[Subtitles]
protocol: str
class Variant(NamedTuple):
"""A program variant metadata."""
code: str
class Video(NamedTuple):
"""A video rendition."""
resolution: tuple[int, int]
frame_rate: float
average_bandwidth: int
video: Video
#
@ -65,48 +86,29 @@ class SubtitlesTrack(NamedTuple):
# Source objects
#
class ProgramSource(NamedTuple):
"""A program source item."""
"""A program source."""
program: Program
player_config_url: str
class RenditionSource(NamedTuple):
"""A rendition source item."""
"""A rendition source."""
program: Program
rendition: Rendition
protocol: str
program_index_url: Program
program_index_url: str
class VariantSource(NamedTuple):
"""A variant source item."""
class VideoMedia(NamedTuple):
"""A video media."""
track: VideoTrack
track_index_url: str
class AudioMedia(NamedTuple):
"""An audio media."""
track: AudioTrack
track_index_url: str
class SubtitlesMedia(NamedTuple):
"""A subtitles media."""
track: SubtitlesTrack
track_index_url: str
"""A variant source."""
program: Program
rendition: Rendition
variant: Variant
video_media: VideoMedia
audio_media: AudioMedia
subtitles_media: Optional[SubtitlesMedia]
video_index_url: str
audio_index_url: str
subtitles_index_url: Optional[str]
class Target(NamedTuple):

View File

@ -5,6 +5,14 @@
import re
def _make_rendition_code(rendition):
raise NotImplementedError
def _make_variant_code(variant):
raise NotImplementedError
def file_name_builder(
*,
use_id=False,
@ -36,10 +44,10 @@ def file_name_builder(
fields.append(replace_sequence_counter(program.subtitle))
if add_rendition:
fields.append(rendition.code)
fields.append(_make_rendition_code(rendition))
if add_variant:
fields.append(variant.code)
fields.append(_make_variant_code(variant))
name = sep.join(fields)
name = re.sub(r'[/:<>"\\|?*]', "", name)

View File

@ -5,130 +5,122 @@
import json
from .error import InvalidPage, PageNotFound, PageNotSupported, HTTPError
from .model import Program
from .error import (
HTTPError,
InvalidPage,
InvalidPageData,
PageNotFound,
PageNotSupported,
)
_DATA_MARK = '<script id="__NEXT_DATA__" type="application/json">'
def _process_programs_page(page_value):
language = page_value["language"]
zone_found = False
program_found = False
for zone in page_value["zones"]:
if zone["code"].startswith("program_content_"):
if zone_found:
raise InvalidPage("PROGRAMS_CONTENT_ZONES_COUNT")
zone_found = True
else:
continue
for data_item in zone["content"]["data"]:
if data_item["type"] == "program":
if program_found:
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_COUNT")
program_found = True
else:
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_TYPE")
yield (
Program(
data_item["programId"],
language,
data_item["title"],
data_item["subtitle"],
),
data_item["player"]["config"],
)
if not zone_found:
raise InvalidPage("PROGRAMS_CONTENT_ZONES_COUNT")
if not program_found:
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_COUNT")
def _process_collections_page(page_value):
language = page_value["language"]
main_zone_found = False
sub_zone_found = False
program_found = False
for zone in page_value["zones"]:
if zone["code"].startswith("collection_videos_"):
if main_zone_found:
raise InvalidPage("COLLECTIONS_MAIN_ZONE_COUNT")
if program_found:
raise InvalidPage("COLLECTIONS_MIXED_ZONES")
main_zone_found = True
elif zone["code"].startswith("collection_subcollection_"):
if program_found and not sub_zone_found:
raise InvalidPage("COLLECTIONS_MIXED_ZONES")
sub_zone_found = True
else:
continue
for data_item in zone["content"]["data"]:
if (_ := data_item["type"]) == "teaser":
program_found = True
else:
raise InvalidPage("COLLECTIONS_INVALID_CONTENT_DATA_ITEM", _)
yield (
Program(
data_item["programId"],
language,
data_item["title"],
data_item["subtitle"],
),
f"https://api.arte.tv/api/player/v2/config/{language}/{data_item['programId']}",
)
if not main_zone_found:
raise InvalidPage("COLLECTIONS_MAIN_ZONE_COUNT")
if not program_found:
raise InvalidPage("COLLECTIONS_PROGRAMS_COUNT")
def iter_programs(page_url, http):
"""Iterate over programs listed on given ArteTV page."""
def fetch_page_content(http, page_url):
"""Fetch html content at given URL."""
r = http.request("GET", page_url)
# special handling of 404
if r.status == 404:
raise PageNotFound(page_url)
raise PageNotFound()
HTTPError.raise_for_status(r)
# no HTML parsing required, whe just find the mark
html = r.data.decode("utf-8")
start = html.find(_DATA_MARK)
return r.data.decode("utf-8")
def extract_page_data(html_content):
"""Extract JSON page data from html content."""
start = html_content.find(_DATA_MARK)
if start < 0:
raise InvalidPage("DATA_MARK_NOT_FOUND", page_url)
raise InvalidPage("DATA_MARK_NOT_FOUND")
start += len(_DATA_MARK)
end = html.index("</script>", start)
end = html_content.index("</script>", start)
try:
next_js_data = json.loads(html[start:end].strip())
return json.loads(html_content[start:end].strip())
except json.JSONDecodeError:
raise InvalidPage("INVALID_JSON_DATA", page_url)
raise InvalidPage("INVALID_JSON_DATA")
def _find_zones(page_value, code_prefix):
return [
zone for zone in page_value["zones"] if zone["code"].startswith(code_prefix)
]
def _find_unique_zone(page_value, code_prefix):
zones = _find_zones(page_value, code_prefix)
if len(zones) != 1:
raise InvalidPageData("ZONE_COUNT")
return zones[0]
def _find_items(zone, item_type):
return [item for item in zone["content"]["data"] if item["type"] == item_type]
def _find_unique_item(zone, item_type):
items = _find_items(zone, item_type)
if len(items) != 1:
raise InvalidPageData("ITEM_COUNT")
return items[0]
def _read_program_page(page_value):
language = page_value["language"]
zone = _find_unique_zone(page_value, "program_content_")
item = _find_unique_item(zone, "program")
return (
item["programId"],
language,
item["title"],
item["subtitle"],
)
def _read_collection_page(page_value):
language = page_value["language"]
main_zone = _find_unique_zone(page_value, "collection_videos_")
main_items = _find_items(main_zone, "teaser")
sub_zones = _find_zones(page_value, "collection_subcollection_")
if sub_zones:
if main_items:
raise InvalidPageData("MIXED_ZONES")
items = [
item for sub_zone in sub_zones for item in _find_items(sub_zone, "teaser")
]
elif main_items:
items = main_items
else:
return []
return [
(
item["programId"],
language,
item["title"],
item["subtitle"],
)
for item in items
]
def read_page_data(page_data):
"""Return programs listed on given JSON page data."""
try:
page_value = next_js_data["props"]["pageProps"]["props"]["page"]["value"]
page_value = page_data["props"]["pageProps"]["props"]["page"]["value"]
match page_value["type"]:
case "program":
yield from _process_programs_page(page_value)
return [_read_program_page(page_value)]
case "collection":
yield from _process_collections_page(page_value)
return _read_collection_page(page_value)
case _:
raise PageNotSupported(page_url, page_value)
raise PageNotSupported()
except (KeyError, IndexError, ValueError) as e:
raise InvalidPage("SCHEMA", page_url) from e
except InvalidPage as e:
raise InvalidPage(e.args[0], page_url) from e
raise InvalidPage("SCHEMA") from e