Compare commits
4 Commits
23e2183c93
...
c8aab4c5a3
Author | SHA1 | Date |
---|---|---|
Barbagus | c8aab4c5a3 | |
Barbagus | 58b0ba30a3 | |
Barbagus | 4ffc32eb61 | |
Barbagus | bdc8b7b246 |
|
@ -9,90 +9,181 @@ from .error import *
|
|||
from .model import *
|
||||
|
||||
|
||||
def fetch_program_sources(url, http):
|
||||
"""Fetch program sources listed on given ArteTV page."""
|
||||
from .www import iter_programs
|
||||
def load_program_sources(http, page_url):
|
||||
"""Load programs sources listed on given ArteTV page."""
|
||||
from .www import read_page_data, fetch_page_content, extract_page_data
|
||||
|
||||
page_content = fetch_page_content(http, page_url)
|
||||
page_data = extract_page_data(page_content)
|
||||
programs = read_page_data(page_data)
|
||||
|
||||
if not programs:
|
||||
raise UnexpectedError("NO_PROGRAMS")
|
||||
|
||||
return [
|
||||
ProgramSource(
|
||||
program,
|
||||
player_config_url,
|
||||
Program(id, language, title, subtitle),
|
||||
f"https://api.arte.tv/api/player/v2/config/{language}/{id}",
|
||||
)
|
||||
for program, player_config_url in iter_programs(url, http)
|
||||
for id, language, title, subtitle in programs
|
||||
]
|
||||
|
||||
|
||||
def fetch_rendition_sources(program_sources, http):
|
||||
"""Fetch renditions for given programs."""
|
||||
from itertools import groupby
|
||||
# "und" language code officially stands for "undefined"
|
||||
_AUDIO_RENDITIONS = {
|
||||
"VO": Rendition.Audio("und", True, False),
|
||||
"VOF": Rendition.Audio("fra", True, False),
|
||||
"VOA": Rendition.Audio("deu", True, False),
|
||||
"VOEU": Rendition.Audio("und", True, False),
|
||||
"VF": Rendition.Audio("fra", False, False),
|
||||
"VA": Rendition.Audio("deu", False, False),
|
||||
"VE": Rendition.Audio("und", False, False),
|
||||
"VFAUD": Rendition.Audio("fra", False, True),
|
||||
"VAAUD": Rendition.Audio("deu", False, True),
|
||||
}
|
||||
|
||||
from .api import iter_renditions
|
||||
_SUBTITLES_RENDITIONS = {
|
||||
"STF": Rendition.Subtitles("fra", False),
|
||||
"STA": Rendition.Subtitles("deu", False),
|
||||
"STMF": Rendition.Subtitles("fra", True),
|
||||
"STMA": Rendition.Subtitles("deu", True),
|
||||
"STE[ANG]": Rendition.Subtitles("eng", False),
|
||||
"STE[ESP]": Rendition.Subtitles("esp", False),
|
||||
"STE[POL]": Rendition.Subtitles("pol", False),
|
||||
"STE[ITA]": Rendition.Subtitles("ita", False),
|
||||
}
|
||||
|
||||
sources = [
|
||||
|
||||
def _parse_rendition_code(code):
|
||||
audio_code, subtitles_code = code.split("-") if "-" in code else (code, None)
|
||||
|
||||
try:
|
||||
audio_rendition = _AUDIO_RENDITIONS[audio_code]
|
||||
except KeyError:
|
||||
raise UnexpectedError("AUDIO_RENDITION_CODE")
|
||||
|
||||
if subtitles_code:
|
||||
try:
|
||||
subtitles_rendition = _SUBTITLES_RENDITIONS[subtitles_code]
|
||||
except KeyError:
|
||||
raise UnexpectedError("SUBTITLES_RENDITION_CODE")
|
||||
else:
|
||||
subtitles_rendition = None
|
||||
|
||||
return audio_rendition, subtitles_rendition
|
||||
|
||||
|
||||
def load_rendition_sources(http, program_source):
|
||||
"""Fetch rendition sources for a given program."""
|
||||
from .api import fetch_api_object, read_config_player_object
|
||||
|
||||
program, config_player_url = program_source
|
||||
config_player_object = fetch_api_object(http, config_player_url)
|
||||
program_id, streams = read_config_player_object(config_player_object)
|
||||
|
||||
if program_id != program.id:
|
||||
raise UnexpectedError("PROGRAM_ID_MISMATCH")
|
||||
|
||||
if not streams:
|
||||
raise UnexpectedError("NO_RENDITIONS")
|
||||
|
||||
track_renditions = [_parse_rendition_code(code) for code, _, _, _ in streams]
|
||||
|
||||
# sometimes it happens that original audio renditions be a mix "fra" or "deu" and "und",
|
||||
# so in that case, we replace the "und" accordingly
|
||||
originals = {audio.language for audio, _ in track_renditions if audio.original}
|
||||
if len(originals) == 2 and "und" in originals:
|
||||
originals.remove("und")
|
||||
original = originals.pop()
|
||||
track_renditions = [
|
||||
(
|
||||
audio._replace(language=original) if audio.original else audio,
|
||||
subtitles,
|
||||
)
|
||||
for audio, subtitles in track_renditions
|
||||
]
|
||||
|
||||
return [
|
||||
RenditionSource(
|
||||
program,
|
||||
rendition,
|
||||
protocol,
|
||||
Rendition(label, audio, subtitles, protocol),
|
||||
program_index_url,
|
||||
)
|
||||
for program, player_config_url in program_sources
|
||||
for rendition, protocol, program_index_url in iter_renditions(
|
||||
program.id,
|
||||
player_config_url,
|
||||
http,
|
||||
for (audio, subtitles), (_, label, protocol, program_index_url) in zip(
|
||||
track_renditions, streams
|
||||
)
|
||||
]
|
||||
|
||||
descriptors = list({(s.rendition.code, s.rendition.label) for s in sources})
|
||||
|
||||
descriptors.sort()
|
||||
for code, group in groupby(descriptors, key=lambda t: t[0]):
|
||||
labels_for_code = [t[1] for t in group]
|
||||
if len(labels_for_code) != 1:
|
||||
raise UnexpectedError("MULTIPLE_RENDITION_LABELS", code, labels_for_code)
|
||||
def _check_variant_renditions(rendition, audio, subtitles):
|
||||
# Check wether the audio/subtitles rendition we figured
|
||||
# from ArteTV rendition code do match the one found in the
|
||||
# program index.
|
||||
# Update track languages (in particular, will get rid of "und")
|
||||
updated_audio = rendition.audio._replace(language=audio.language)
|
||||
if updated_audio != audio:
|
||||
raise UnexpectedError("AUDIO_RENDITION_MISMATCH")
|
||||
|
||||
return sources
|
||||
updated_subtitles = (
|
||||
rendition.subtitles._replace(language=subtitles.language)
|
||||
if rendition.subtitles
|
||||
else None
|
||||
)
|
||||
if updated_subtitles != subtitles:
|
||||
raise UnexpectedError("SUBTITLES_RENDITION_MISMATCH")
|
||||
|
||||
return rendition._replace(audio=audio, subtitles=subtitles)
|
||||
|
||||
|
||||
def fetch_variant_sources(renditions_sources, http):
|
||||
"""Fetch variants for given renditions."""
|
||||
from itertools import groupby
|
||||
def load_variant_sources(http, rendition_source):
|
||||
"""Load variant sources for a given rendition."""
|
||||
from .hls import fetch_index, read_ng_program_index
|
||||
|
||||
from .hls import iter_variants
|
||||
program, rendition, program_index_url = rendition_source
|
||||
if rendition.protocol != "HLS_NG":
|
||||
raise UnsupportedHLSProtocol()
|
||||
|
||||
sources = [
|
||||
program_index = fetch_index(http, program_index_url)
|
||||
(
|
||||
variants,
|
||||
(audio, audio_index_url),
|
||||
(subtitles, subtitles_index_url),
|
||||
) = read_ng_program_index(program_index)
|
||||
|
||||
if not variants:
|
||||
raise UnexpectedError("NO_VARIANTS")
|
||||
|
||||
rendition = _check_variant_renditions(rendition, audio, subtitles)
|
||||
|
||||
return [
|
||||
VariantSource(
|
||||
program,
|
||||
rendition,
|
||||
variant,
|
||||
VariantSource.VideoMedia(*video),
|
||||
VariantSource.AudioMedia(*audio),
|
||||
VariantSource.SubtitlesMedia(*subtitles) if subtitles else None,
|
||||
)
|
||||
for program, rendition, protocol, program_index_url in renditions_sources
|
||||
for variant, video, audio, subtitles in iter_variants(
|
||||
protocol, program_index_url, http
|
||||
video_track_index,
|
||||
audio_index_url,
|
||||
subtitles_index_url,
|
||||
)
|
||||
for variant, video_track_index in variants
|
||||
]
|
||||
|
||||
descriptors = list(
|
||||
{(s.variant.code, s.video_media.track.frame_rate) for s in sources}
|
||||
)
|
||||
|
||||
descriptors.sort()
|
||||
for code, group in groupby(descriptors, key=lambda t: t[0]):
|
||||
frame_rates_for_code = [t[1] for t in group]
|
||||
if len(frame_rates_for_code) != 1:
|
||||
raise UnexpectedError(
|
||||
"MULTIPLE_RENDITION_FRAME_RATES", code, frame_rates_for_code
|
||||
)
|
||||
def _load_mp4_input(http, track_index_url):
|
||||
from .hls import fetch_index, read_mp4_index
|
||||
|
||||
return sources
|
||||
track_index = fetch_index(http, track_index_url)
|
||||
return read_mp4_index(track_index)[0]
|
||||
|
||||
|
||||
def fetch_targets(variant_sources, http, **naming_options):
|
||||
def _load_vtt_input(http, track_index_url):
|
||||
from .hls import fetch_index, read_vtt_index
|
||||
|
||||
track_index = fetch_index(http, track_index_url)
|
||||
return read_vtt_index(track_index)
|
||||
|
||||
|
||||
def fetch_targets(variant_sources: list[VariantSource], http, **naming_options):
|
||||
"""Compile download targets for given variants."""
|
||||
from .hls import fetch_mp4_media, fetch_vtt_media
|
||||
from .naming import file_name_builder
|
||||
|
||||
build_file_name = file_name_builder(**naming_options)
|
||||
|
@ -100,25 +191,38 @@ def fetch_targets(variant_sources, http, **naming_options):
|
|||
targets = [
|
||||
Target(
|
||||
Target.VideoInput(
|
||||
video_media.track,
|
||||
fetch_mp4_media(video_media.track_index_url, http),
|
||||
VideoTrack(
|
||||
variant.video.resolution[0],
|
||||
variant.video.resolution[1],
|
||||
variant.video.frame_rate,
|
||||
),
|
||||
_load_mp4_input(http, video_index_url),
|
||||
),
|
||||
Target.AudioInput(
|
||||
audio_media.track,
|
||||
fetch_mp4_media(audio_media.track_index_url, http),
|
||||
AudioTrack(
|
||||
f"Audio[{rendition.audio.language}]",
|
||||
rendition.audio.language,
|
||||
rendition.audio.original,
|
||||
rendition.audio.visual_impaired,
|
||||
),
|
||||
_load_mp4_input(http, audio_index_url),
|
||||
),
|
||||
(
|
||||
Target.SubtitlesInput(
|
||||
subtitles_media.track,
|
||||
fetch_vtt_media(subtitles_media.track_index_url, http),
|
||||
SubtitlesTrack(
|
||||
f"Audio[{rendition.subtitles.language}]",
|
||||
rendition.subtitles.language,
|
||||
rendition.subtitles.hearing_impaired,
|
||||
),
|
||||
_load_vtt_input(http, subtitles_index_url),
|
||||
)
|
||||
if subtitles_media
|
||||
if rendition.subtitles and subtitles_index_url
|
||||
else None
|
||||
),
|
||||
(program.title, program.subtitle) if program.subtitle else program.title,
|
||||
build_file_name(program, rendition, variant),
|
||||
)
|
||||
for program, rendition, variant, video_media, audio_media, subtitles_media in variant_sources
|
||||
for program, rendition, variant, video_index_url, audio_index_url, subtitles_index_url in variant_sources
|
||||
]
|
||||
|
||||
return targets
|
||||
|
|
|
@ -38,15 +38,15 @@ import docopt
|
|||
import urllib3
|
||||
|
||||
from . import (
|
||||
HTTPError,
|
||||
ModuleError,
|
||||
UnexpectedError,
|
||||
HTTPError,
|
||||
__version__,
|
||||
download_targets,
|
||||
fetch_program_sources,
|
||||
fetch_rendition_sources,
|
||||
fetch_targets,
|
||||
fetch_variant_sources,
|
||||
load_program_sources,
|
||||
load_rendition_sources,
|
||||
load_variant_sources,
|
||||
)
|
||||
|
||||
|
||||
|
@ -97,9 +97,33 @@ def _create_progress():
|
|||
return on_progress
|
||||
|
||||
|
||||
def _make_rendition_code(rendition):
|
||||
if rendition.audio.original:
|
||||
code = "og:"
|
||||
elif rendition.audio.visual_impaired:
|
||||
code = "ad:"
|
||||
else:
|
||||
code = ""
|
||||
|
||||
code += rendition.audio.language
|
||||
|
||||
if rendition.subtitles:
|
||||
if rendition.subtitles.hearing_impaired:
|
||||
code += "-cc"
|
||||
elif rendition.subtitles.language != rendition.audio.language:
|
||||
code += "-st:" + rendition.subtitles.language
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def _select_rendition_sources(rendition_code, rendition_sources):
|
||||
rendition_sources_with_code = [
|
||||
(rendition_source, _make_rendition_code(rendition_source.rendition))
|
||||
for rendition_source in rendition_sources
|
||||
]
|
||||
|
||||
if rendition_code:
|
||||
filtered = [s for s in rendition_sources if s.rendition.code == rendition_code]
|
||||
filtered = [s for s, c in rendition_sources_with_code if c == rendition_code]
|
||||
if filtered:
|
||||
return filtered
|
||||
print(
|
||||
|
@ -108,26 +132,49 @@ def _select_rendition_sources(rendition_code, rendition_sources):
|
|||
else:
|
||||
print("Available renditions:")
|
||||
|
||||
key = lambda s: (s.rendition.label, s.rendition.code)
|
||||
descriptors = list({(c, s.rendition.label) for s, c in rendition_sources_with_code})
|
||||
|
||||
rendition_sources.sort(key=key)
|
||||
for (label, code), _ in itertools.groupby(rendition_sources, key=key):
|
||||
print(f"{code:>12} : {label}")
|
||||
descriptors.sort()
|
||||
for code, group in itertools.groupby(descriptors, key=lambda t: t[0]):
|
||||
if len([t[1] for t in group]) != 1:
|
||||
raise UnexpectedError("MULTIPLE_RENDITION_LABELS")
|
||||
|
||||
descriptors.sort(key=lambda t: t[1])
|
||||
for code, label in descriptors:
|
||||
print(f"{code:>13} : {label}")
|
||||
|
||||
raise Abort()
|
||||
|
||||
|
||||
def _make_variant_code(variant):
|
||||
return f"{variant.video.resolution[1]}p"
|
||||
|
||||
|
||||
def _select_variant_sources(variant_code, variant_sources):
|
||||
variant_sources_with_code = [
|
||||
(variant_source, _make_variant_code(variant_source.variant))
|
||||
for variant_source in variant_sources
|
||||
]
|
||||
|
||||
if variant_code:
|
||||
filtered = [s for s in variant_sources if s.variant.code == variant_code]
|
||||
filtered = [s for s, c in variant_sources_with_code if c == variant_code]
|
||||
if filtered:
|
||||
return filtered
|
||||
print(f"{variant_code!r} is not a valid variant code. Available values are:")
|
||||
else:
|
||||
print("Available variants:")
|
||||
|
||||
variant_sources.sort(key=lambda s: s.video_media.track.height, reverse=True)
|
||||
for code, _ in itertools.groupby(variant_sources, key=lambda s: s.variant.code):
|
||||
descriptors = list(
|
||||
{(c, s.variant.video.frame_rate) for s, c in variant_sources_with_code}
|
||||
)
|
||||
|
||||
descriptors.sort()
|
||||
for code, group in itertools.groupby(descriptors, key=lambda t: t[0]):
|
||||
if len([t[1] for t in group]) != 1:
|
||||
raise UnexpectedError("MULTIPLE_VARIANT_FRAME_RATES")
|
||||
|
||||
descriptors.sort(key=lambda t: int(t[0][:-1]), reverse=True)
|
||||
for code, _ in descriptors:
|
||||
print(f"{code:>12}")
|
||||
|
||||
raise Abort()
|
||||
|
@ -140,16 +187,24 @@ def main():
|
|||
http = urllib3.PoolManager(timeout=5)
|
||||
|
||||
try:
|
||||
program_sources = fetch_program_sources(args["URL"], http)
|
||||
program_sources = load_program_sources(http, args["URL"])
|
||||
|
||||
rendition_sources = _select_rendition_sources(
|
||||
args["RENDITION"],
|
||||
fetch_rendition_sources(program_sources, http),
|
||||
[
|
||||
rendition_source
|
||||
for program_source in program_sources
|
||||
for rendition_source in load_rendition_sources(http, program_source)
|
||||
],
|
||||
)
|
||||
|
||||
variant_sources = _select_variant_sources(
|
||||
args["VARIANT"],
|
||||
fetch_variant_sources(rendition_sources, http),
|
||||
[
|
||||
variant_source
|
||||
for rendition_source in rendition_sources
|
||||
for variant_source in load_variant_sources(http, rendition_source)
|
||||
],
|
||||
)
|
||||
|
||||
targets = fetch_targets(
|
||||
|
|
|
@ -5,15 +5,13 @@
|
|||
|
||||
import json
|
||||
|
||||
from .error import UnexpectedAPIResponse, HTTPError
|
||||
from .model import Rendition
|
||||
from .error import HTTPError, UnexpectedAPIResponse
|
||||
|
||||
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
|
||||
|
||||
|
||||
def _fetch_api_object(http, url, object_type):
|
||||
# Fetch an API object.
|
||||
|
||||
def fetch_api_object(http, url):
|
||||
"""Fetch an API object."""
|
||||
r = http.request("GET", url)
|
||||
HTTPError.raise_for_status(r)
|
||||
|
||||
|
@ -21,51 +19,30 @@ def _fetch_api_object(http, url, object_type):
|
|||
if mime_type != MIME_TYPE:
|
||||
raise UnexpectedAPIResponse("MIME_TYPE", url, MIME_TYPE, mime_type)
|
||||
|
||||
obj = json.loads(r.data.decode("utf-8"))
|
||||
return json.loads(r.data.decode("utf-8"))
|
||||
|
||||
|
||||
def read_config_player_object(obj):
|
||||
"""Return program ID and streams information from config player object."""
|
||||
try:
|
||||
data_type = obj["data"]["type"]
|
||||
if data_type != object_type:
|
||||
raise UnexpectedAPIResponse("OBJECT_TYPE", url, object_type, data_type)
|
||||
if obj["data"]["type"] != "ConfigPlayer":
|
||||
raise UnexpectedAPIResponse("OBJECT_TYPE")
|
||||
|
||||
return obj["data"]["attributes"]
|
||||
attributes = obj["data"]["attributes"]
|
||||
|
||||
except (KeyError, IndexError, ValueError) as e:
|
||||
raise UnexpectedAPIResponse("SCHEMA", url) from e
|
||||
program_id = attributes["metadata"]["providerId"]
|
||||
|
||||
|
||||
def iter_renditions(program_id, player_config_url, http):
|
||||
"""Iterate over renditions for the given program."""
|
||||
obj = _fetch_api_object(http, player_config_url, "ConfigPlayer")
|
||||
|
||||
codes = set()
|
||||
try:
|
||||
provider_id = obj["metadata"]["providerId"]
|
||||
if provider_id != program_id:
|
||||
raise UnexpectedAPIResponse(
|
||||
"PROVIDER_ID_MISMATCH", player_config_url, provider_id
|
||||
)
|
||||
|
||||
for s in obj["streams"]:
|
||||
code = s["versions"][0]["eStat"]["ml5"]
|
||||
|
||||
if code in codes:
|
||||
raise UnexpectedAPIResponse(
|
||||
"DUPLICATE_RENDITION_CODE", player_config_url, code
|
||||
)
|
||||
codes.add(code)
|
||||
|
||||
yield (
|
||||
Rendition(
|
||||
s["versions"][0]["eStat"]["ml5"],
|
||||
s["versions"][0]["label"],
|
||||
),
|
||||
streams = [
|
||||
(
|
||||
s["versions"][0]["eStat"]["ml5"],
|
||||
s["versions"][0]["label"],
|
||||
s["protocol"],
|
||||
s["url"],
|
||||
)
|
||||
for s in attributes["streams"]
|
||||
]
|
||||
|
||||
return program_id, streams
|
||||
|
||||
except (KeyError, IndexError, ValueError) as e:
|
||||
raise UnexpectedAPIResponse("SCHEMA", player_config_url) from e
|
||||
|
||||
if not codes:
|
||||
raise UnexpectedAPIResponse("NO_RENDITIONS", player_config_url)
|
||||
raise UnexpectedAPIResponse("SCHEMA") from e
|
||||
|
|
|
@ -48,6 +48,10 @@ class InvalidPage(UnexpectedError):
|
|||
"""Invalid ArteTV page."""
|
||||
|
||||
|
||||
class InvalidPageData(UnexpectedError):
|
||||
"""Invalid ArteTV page data."""
|
||||
|
||||
|
||||
#
|
||||
# api
|
||||
#
|
||||
|
|
|
@ -3,11 +3,10 @@
|
|||
|
||||
"""Provide HLS protocol utilities."""
|
||||
|
||||
|
||||
import m3u8
|
||||
|
||||
from .error import UnexpectedHLSResponse, UnsupportedHLSProtocol, HTTPError
|
||||
from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
|
||||
from .error import HTTPError, UnexpectedHLSResponse
|
||||
from .model import Rendition, Variant
|
||||
|
||||
#
|
||||
# WARNING !
|
||||
|
@ -27,8 +26,8 @@ from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack
|
|||
MIME_TYPE = "application/x-mpegURL"
|
||||
|
||||
|
||||
def _fetch_index(http, url):
|
||||
# Fetch a M3U8 playlist
|
||||
def fetch_index(http, url):
|
||||
"""Fetch a M3U8 playlist."""
|
||||
r = http.request("GET", url)
|
||||
HTTPError.raise_for_status(r)
|
||||
|
||||
|
@ -38,108 +37,89 @@ def _fetch_index(http, url):
|
|||
return m3u8.loads(r.data.decode("utf-8"), url)
|
||||
|
||||
|
||||
def iter_variants(protocol, program_index_url, http):
|
||||
"""Iterate over variants for the given rendition."""
|
||||
if protocol != "HLS_NG":
|
||||
raise UnsupportedHLSProtocol(protocol, program_index_url)
|
||||
|
||||
program_index = _fetch_index(http, program_index_url)
|
||||
|
||||
audio_media = None
|
||||
subtitles_media = None
|
||||
|
||||
for media in program_index.media:
|
||||
match media.type:
|
||||
case "AUDIO":
|
||||
if audio_media:
|
||||
raise UnexpectedHLSResponse(
|
||||
"MULTIPLE_AUDIO_MEDIA", program_index_url
|
||||
)
|
||||
audio_media = media
|
||||
case "SUBTITLES":
|
||||
if subtitles_media:
|
||||
raise UnexpectedHLSResponse(
|
||||
"MULTIPLE_SUBTITLES_MEDIA", program_index_url
|
||||
)
|
||||
subtitles_media = media
|
||||
|
||||
if not audio_media:
|
||||
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
|
||||
|
||||
audio = (
|
||||
AudioTrack(
|
||||
audio_media.name,
|
||||
audio_media.language,
|
||||
audio_media.name.startswith("VO"),
|
||||
(
|
||||
audio_media.characteristics is not None
|
||||
and ("public.accessibility" in audio_media.characteristics)
|
||||
),
|
||||
),
|
||||
audio_media.absolute_uri,
|
||||
)
|
||||
|
||||
subtitles = (
|
||||
def _select_program_audio(program_index):
|
||||
all = [
|
||||
(
|
||||
SubtitlesTrack(
|
||||
subtitles_media.name,
|
||||
subtitles_media.language,
|
||||
media.group_id,
|
||||
Rendition.Audio(
|
||||
media.language,
|
||||
media.name.startswith("VO"),
|
||||
(
|
||||
subtitles_media.characteristics is not None
|
||||
and ("public.accessibility" in subtitles_media.characteristics)
|
||||
media.characteristics is not None
|
||||
and ("public.accessibility" in media.characteristics)
|
||||
),
|
||||
),
|
||||
subtitles_media.absolute_uri,
|
||||
media.absolute_uri,
|
||||
)
|
||||
if subtitles_media
|
||||
else None
|
||||
)
|
||||
for media in program_index.media
|
||||
if media.type == "AUDIO"
|
||||
]
|
||||
|
||||
codes = set()
|
||||
if not all:
|
||||
raise UnexpectedHLSResponse("NO_PROGRAM_AUDIO_MEDIA")
|
||||
|
||||
if len(all) > 1:
|
||||
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_AUDIO_MEDIA")
|
||||
|
||||
return all[0]
|
||||
|
||||
|
||||
def _select_program_subtitles(program_index):
|
||||
all = [
|
||||
(
|
||||
media.group_id,
|
||||
Rendition.Subtitles(
|
||||
media.language,
|
||||
(
|
||||
media.characteristics is not None
|
||||
and ("public.accessibility" in media.characteristics)
|
||||
),
|
||||
),
|
||||
media.absolute_uri,
|
||||
)
|
||||
for media in program_index.media
|
||||
if media.type == "SUBTITLES"
|
||||
]
|
||||
|
||||
if not all:
|
||||
return None, None, None
|
||||
|
||||
if len(all) > 1:
|
||||
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_SUBTITLES_MEDIA")
|
||||
|
||||
return all[0]
|
||||
|
||||
|
||||
def read_ng_program_index(program_index):
|
||||
"""Return variants, audio and subtitles from program index."""
|
||||
|
||||
audio_id, audio, audio_url = _select_program_audio(program_index)
|
||||
subtitles_id, subtitles, subtitles_url = _select_program_subtitles(program_index)
|
||||
|
||||
variants = []
|
||||
|
||||
for video_media in program_index.playlists:
|
||||
stream_info = video_media.stream_info
|
||||
if stream_info.audio != audio_media.group_id:
|
||||
raise UnexpectedHLSResponse(
|
||||
"INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio
|
||||
)
|
||||
if stream_info.audio != audio_id:
|
||||
raise UnexpectedHLSResponse("PROGRAM_AUDIO_MEDIA_ID")
|
||||
|
||||
if subtitles_media:
|
||||
if stream_info.subtitles != subtitles_media.group_id:
|
||||
raise UnexpectedHLSResponse(
|
||||
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
|
||||
)
|
||||
elif stream_info.subtitles:
|
||||
raise UnexpectedHLSResponse(
|
||||
"INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles
|
||||
)
|
||||
if stream_info.subtitles != subtitles_id:
|
||||
raise UnexpectedHLSResponse("PROGRAM_SUBTITLES_MEDIA_ID")
|
||||
|
||||
code = f"{stream_info.resolution[1]}p"
|
||||
if code in codes:
|
||||
raise UnexpectedHLSResponse(
|
||||
"DUPLICATE_STREAM_CODE", program_index_url, code
|
||||
)
|
||||
codes.add(code)
|
||||
|
||||
yield (
|
||||
Variant(
|
||||
code,
|
||||
stream_info.average_bandwidth,
|
||||
),
|
||||
variants.append(
|
||||
(
|
||||
VideoTrack(
|
||||
stream_info.resolution[0],
|
||||
stream_info.resolution[1],
|
||||
stream_info.frame_rate,
|
||||
Variant(
|
||||
stream_info.average_bandwidth,
|
||||
Variant.Video(
|
||||
stream_info.resolution,
|
||||
stream_info.frame_rate,
|
||||
),
|
||||
),
|
||||
video_media.absolute_uri,
|
||||
),
|
||||
audio,
|
||||
subtitles,
|
||||
)
|
||||
)
|
||||
|
||||
if not codes:
|
||||
raise UnexpectedHLSResponse("NO_VARIANTS", program_index_url)
|
||||
return variants, (audio, audio_url), (subtitles, subtitles_url)
|
||||
|
||||
|
||||
def _convert_byterange(obj):
|
||||
|
@ -150,43 +130,39 @@ def _convert_byterange(obj):
|
|||
return offset, offset + count - 1
|
||||
|
||||
|
||||
def fetch_mp4_media(track_index_url, http):
|
||||
"""Fetch an audio or video media."""
|
||||
track_index = _fetch_index(http, track_index_url)
|
||||
|
||||
def read_mp4_index(track_index):
|
||||
"""Return MP4 file url and ranges."""
|
||||
file_name = track_index.segment_map[0].uri
|
||||
start, end = _convert_byterange(track_index.segment_map[0])
|
||||
if start != 0:
|
||||
raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url)
|
||||
ranges = [(start, end)]
|
||||
|
||||
if start != 0:
|
||||
raise UnexpectedHLSResponse("MP4_INDEX_FRAGMENT_START")
|
||||
|
||||
# ranges = [(start, end)]
|
||||
next_start = end + 1
|
||||
|
||||
for segment in track_index.segments:
|
||||
if segment.uri != file_name:
|
||||
raise UnexpectedHLSResponse("MULTIPLE_AV_INDEX_FILES", track_index_url)
|
||||
raise UnexpectedHLSResponse("MULTIPLE_MP4_INDEX_FILES")
|
||||
|
||||
start, end = _convert_byterange(segment)
|
||||
if start != next_start:
|
||||
raise UnexpectedHLSResponse(
|
||||
"DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url
|
||||
)
|
||||
raise UnexpectedHLSResponse("DISCONTINUOUS_MP4_INDEX_FRAGMENT")
|
||||
|
||||
# ranges.append((start, end))
|
||||
ranges.append((start, end))
|
||||
next_start = end + 1
|
||||
|
||||
return track_index.segment_map[0].absolute_uri
|
||||
return track_index.segment_map[0].absolute_uri, ranges
|
||||
|
||||
|
||||
def fetch_vtt_media(track_index_url, http):
|
||||
"""Fetch an audio or video media."""
|
||||
track_index = _fetch_index(http, track_index_url)
|
||||
def read_vtt_index(track_index):
|
||||
"""Return VTT file url."""
|
||||
urls = [s.absolute_uri for s in track_index.segments]
|
||||
|
||||
if not urls:
|
||||
raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url)
|
||||
raise UnexpectedHLSResponse("NO_VTT_INDEX_FILES")
|
||||
|
||||
if len(urls) > 1:
|
||||
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
|
||||
raise UnexpectedHLSResponse("MULTIPLE_VTT_INDEX_FILES")
|
||||
|
||||
return urls[0]
|
||||
|
|
|
@ -22,15 +22,36 @@ class Program(NamedTuple):
|
|||
class Rendition(NamedTuple):
|
||||
"""A program rendition metadata."""
|
||||
|
||||
code: str
|
||||
class Audio(NamedTuple):
|
||||
"""An audio rendition."""
|
||||
|
||||
language: str
|
||||
original: bool
|
||||
visual_impaired: bool
|
||||
|
||||
class Subtitles(NamedTuple):
|
||||
"""A subtitles rendition."""
|
||||
|
||||
language: str
|
||||
hearing_impaired: bool
|
||||
|
||||
label: str
|
||||
audio: Audio
|
||||
subtitles: Optional[Subtitles]
|
||||
protocol: str
|
||||
|
||||
|
||||
class Variant(NamedTuple):
|
||||
"""A program variant metadata."""
|
||||
|
||||
code: str
|
||||
class Video(NamedTuple):
|
||||
"""A video rendition."""
|
||||
|
||||
resolution: tuple[int, int]
|
||||
frame_rate: float
|
||||
|
||||
average_bandwidth: int
|
||||
video: Video
|
||||
|
||||
|
||||
#
|
||||
|
@ -65,48 +86,29 @@ class SubtitlesTrack(NamedTuple):
|
|||
# Source objects
|
||||
#
|
||||
class ProgramSource(NamedTuple):
|
||||
"""A program source item."""
|
||||
"""A program source."""
|
||||
|
||||
program: Program
|
||||
player_config_url: str
|
||||
|
||||
|
||||
class RenditionSource(NamedTuple):
|
||||
"""A rendition source item."""
|
||||
"""A rendition source."""
|
||||
|
||||
program: Program
|
||||
rendition: Rendition
|
||||
protocol: str
|
||||
program_index_url: Program
|
||||
program_index_url: str
|
||||
|
||||
|
||||
class VariantSource(NamedTuple):
|
||||
"""A variant source item."""
|
||||
|
||||
class VideoMedia(NamedTuple):
|
||||
"""A video media."""
|
||||
|
||||
track: VideoTrack
|
||||
track_index_url: str
|
||||
|
||||
class AudioMedia(NamedTuple):
|
||||
"""An audio media."""
|
||||
|
||||
track: AudioTrack
|
||||
track_index_url: str
|
||||
|
||||
class SubtitlesMedia(NamedTuple):
|
||||
"""A subtitles media."""
|
||||
|
||||
track: SubtitlesTrack
|
||||
track_index_url: str
|
||||
"""A variant source."""
|
||||
|
||||
program: Program
|
||||
rendition: Rendition
|
||||
variant: Variant
|
||||
video_media: VideoMedia
|
||||
audio_media: AudioMedia
|
||||
subtitles_media: Optional[SubtitlesMedia]
|
||||
video_index_url: str
|
||||
audio_index_url: str
|
||||
subtitles_index_url: Optional[str]
|
||||
|
||||
|
||||
class Target(NamedTuple):
|
||||
|
|
|
@ -5,6 +5,14 @@
|
|||
import re
|
||||
|
||||
|
||||
def _make_rendition_code(rendition):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def _make_variant_code(variant):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def file_name_builder(
|
||||
*,
|
||||
use_id=False,
|
||||
|
@ -36,10 +44,10 @@ def file_name_builder(
|
|||
fields.append(replace_sequence_counter(program.subtitle))
|
||||
|
||||
if add_rendition:
|
||||
fields.append(rendition.code)
|
||||
fields.append(_make_rendition_code(rendition))
|
||||
|
||||
if add_variant:
|
||||
fields.append(variant.code)
|
||||
fields.append(_make_variant_code(variant))
|
||||
|
||||
name = sep.join(fields)
|
||||
name = re.sub(r'[/:<>"\\|?*]', "", name)
|
||||
|
|
|
@ -5,130 +5,122 @@
|
|||
|
||||
import json
|
||||
|
||||
from .error import InvalidPage, PageNotFound, PageNotSupported, HTTPError
|
||||
from .model import Program
|
||||
from .error import (
|
||||
HTTPError,
|
||||
InvalidPage,
|
||||
InvalidPageData,
|
||||
PageNotFound,
|
||||
PageNotSupported,
|
||||
)
|
||||
|
||||
_DATA_MARK = '<script id="__NEXT_DATA__" type="application/json">'
|
||||
|
||||
|
||||
def _process_programs_page(page_value):
|
||||
language = page_value["language"]
|
||||
|
||||
zone_found = False
|
||||
program_found = False
|
||||
|
||||
for zone in page_value["zones"]:
|
||||
if zone["code"].startswith("program_content_"):
|
||||
if zone_found:
|
||||
raise InvalidPage("PROGRAMS_CONTENT_ZONES_COUNT")
|
||||
zone_found = True
|
||||
else:
|
||||
continue
|
||||
|
||||
for data_item in zone["content"]["data"]:
|
||||
if data_item["type"] == "program":
|
||||
if program_found:
|
||||
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_COUNT")
|
||||
program_found = True
|
||||
else:
|
||||
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_TYPE")
|
||||
|
||||
yield (
|
||||
Program(
|
||||
data_item["programId"],
|
||||
language,
|
||||
data_item["title"],
|
||||
data_item["subtitle"],
|
||||
),
|
||||
data_item["player"]["config"],
|
||||
)
|
||||
|
||||
if not zone_found:
|
||||
raise InvalidPage("PROGRAMS_CONTENT_ZONES_COUNT")
|
||||
|
||||
if not program_found:
|
||||
raise InvalidPage("PROGRAMS_CONTENT_PROGRAM_COUNT")
|
||||
|
||||
|
||||
def _process_collections_page(page_value):
|
||||
language = page_value["language"]
|
||||
|
||||
main_zone_found = False
|
||||
sub_zone_found = False
|
||||
program_found = False
|
||||
|
||||
for zone in page_value["zones"]:
|
||||
if zone["code"].startswith("collection_videos_"):
|
||||
if main_zone_found:
|
||||
raise InvalidPage("COLLECTIONS_MAIN_ZONE_COUNT")
|
||||
if program_found:
|
||||
raise InvalidPage("COLLECTIONS_MIXED_ZONES")
|
||||
main_zone_found = True
|
||||
elif zone["code"].startswith("collection_subcollection_"):
|
||||
if program_found and not sub_zone_found:
|
||||
raise InvalidPage("COLLECTIONS_MIXED_ZONES")
|
||||
sub_zone_found = True
|
||||
else:
|
||||
continue
|
||||
|
||||
for data_item in zone["content"]["data"]:
|
||||
if (_ := data_item["type"]) == "teaser":
|
||||
program_found = True
|
||||
else:
|
||||
raise InvalidPage("COLLECTIONS_INVALID_CONTENT_DATA_ITEM", _)
|
||||
|
||||
yield (
|
||||
Program(
|
||||
data_item["programId"],
|
||||
language,
|
||||
data_item["title"],
|
||||
data_item["subtitle"],
|
||||
),
|
||||
f"https://api.arte.tv/api/player/v2/config/{language}/{data_item['programId']}",
|
||||
)
|
||||
|
||||
if not main_zone_found:
|
||||
raise InvalidPage("COLLECTIONS_MAIN_ZONE_COUNT")
|
||||
|
||||
if not program_found:
|
||||
raise InvalidPage("COLLECTIONS_PROGRAMS_COUNT")
|
||||
|
||||
|
||||
def iter_programs(page_url, http):
|
||||
"""Iterate over programs listed on given ArteTV page."""
|
||||
def fetch_page_content(http, page_url):
|
||||
"""Fetch html content at given URL."""
|
||||
r = http.request("GET", page_url)
|
||||
|
||||
# special handling of 404
|
||||
if r.status == 404:
|
||||
raise PageNotFound(page_url)
|
||||
raise PageNotFound()
|
||||
HTTPError.raise_for_status(r)
|
||||
|
||||
# no HTML parsing required, whe just find the mark
|
||||
html = r.data.decode("utf-8")
|
||||
start = html.find(_DATA_MARK)
|
||||
return r.data.decode("utf-8")
|
||||
|
||||
|
||||
def extract_page_data(html_content):
|
||||
"""Extract JSON page data from html content."""
|
||||
start = html_content.find(_DATA_MARK)
|
||||
if start < 0:
|
||||
raise InvalidPage("DATA_MARK_NOT_FOUND", page_url)
|
||||
raise InvalidPage("DATA_MARK_NOT_FOUND")
|
||||
start += len(_DATA_MARK)
|
||||
end = html.index("</script>", start)
|
||||
end = html_content.index("</script>", start)
|
||||
|
||||
try:
|
||||
next_js_data = json.loads(html[start:end].strip())
|
||||
return json.loads(html_content[start:end].strip())
|
||||
except json.JSONDecodeError:
|
||||
raise InvalidPage("INVALID_JSON_DATA", page_url)
|
||||
raise InvalidPage("INVALID_JSON_DATA")
|
||||
|
||||
|
||||
def _find_zones(page_value, code_prefix):
|
||||
return [
|
||||
zone for zone in page_value["zones"] if zone["code"].startswith(code_prefix)
|
||||
]
|
||||
|
||||
|
||||
def _find_unique_zone(page_value, code_prefix):
|
||||
zones = _find_zones(page_value, code_prefix)
|
||||
if len(zones) != 1:
|
||||
raise InvalidPageData("ZONE_COUNT")
|
||||
return zones[0]
|
||||
|
||||
|
||||
def _find_items(zone, item_type):
|
||||
return [item for item in zone["content"]["data"] if item["type"] == item_type]
|
||||
|
||||
|
||||
def _find_unique_item(zone, item_type):
|
||||
items = _find_items(zone, item_type)
|
||||
if len(items) != 1:
|
||||
raise InvalidPageData("ITEM_COUNT")
|
||||
return items[0]
|
||||
|
||||
|
||||
def _read_program_page(page_value):
|
||||
language = page_value["language"]
|
||||
|
||||
zone = _find_unique_zone(page_value, "program_content_")
|
||||
item = _find_unique_item(zone, "program")
|
||||
return (
|
||||
item["programId"],
|
||||
language,
|
||||
item["title"],
|
||||
item["subtitle"],
|
||||
)
|
||||
|
||||
|
||||
def _read_collection_page(page_value):
|
||||
language = page_value["language"]
|
||||
|
||||
main_zone = _find_unique_zone(page_value, "collection_videos_")
|
||||
main_items = _find_items(main_zone, "teaser")
|
||||
|
||||
sub_zones = _find_zones(page_value, "collection_subcollection_")
|
||||
|
||||
if sub_zones:
|
||||
if main_items:
|
||||
raise InvalidPageData("MIXED_ZONES")
|
||||
items = [
|
||||
item for sub_zone in sub_zones for item in _find_items(sub_zone, "teaser")
|
||||
]
|
||||
elif main_items:
|
||||
items = main_items
|
||||
else:
|
||||
return []
|
||||
|
||||
return [
|
||||
(
|
||||
item["programId"],
|
||||
language,
|
||||
item["title"],
|
||||
item["subtitle"],
|
||||
)
|
||||
for item in items
|
||||
]
|
||||
|
||||
|
||||
def read_page_data(page_data):
|
||||
"""Return programs listed on given JSON page data."""
|
||||
try:
|
||||
page_value = next_js_data["props"]["pageProps"]["props"]["page"]["value"]
|
||||
page_value = page_data["props"]["pageProps"]["props"]["page"]["value"]
|
||||
|
||||
match page_value["type"]:
|
||||
case "program":
|
||||
yield from _process_programs_page(page_value)
|
||||
return [_read_program_page(page_value)]
|
||||
case "collection":
|
||||
yield from _process_collections_page(page_value)
|
||||
return _read_collection_page(page_value)
|
||||
case _:
|
||||
raise PageNotSupported(page_url, page_value)
|
||||
raise PageNotSupported()
|
||||
|
||||
except (KeyError, IndexError, ValueError) as e:
|
||||
raise InvalidPage("SCHEMA", page_url) from e
|
||||
|
||||
except InvalidPage as e:
|
||||
raise InvalidPage(e.args[0], page_url) from e
|
||||
raise InvalidPage("SCHEMA") from e
|
||||
|
|
Loading…
Reference in New Issue