diff --git a/delarte.py b/delarte.py index d126afb..f500e60 100755 --- a/delarte.py +++ b/delarte.py @@ -10,8 +10,6 @@ Licence: GNU AGPL v3: http://www.gnu.org/licenses/ This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte) """ -from __future__ import annotations - import io import json import os @@ -21,18 +19,16 @@ import sys import tempfile from http import HTTPStatus -from os import environ -from typing import NamedTuple, Optional, cast from urllib.parse import urlparse from urllib.request import urlopen import m3u8 import webvtt -FFMPEG = environ.get("PATH_FFMPEG", "ffmpeg path not found") +FFMPEG = os.environ.get("PATH_FFMPEG", "ffmpeg path not found") -def api_root(url: str): +def load_api_data(url): """Retrieve the root node (infamous "data") of an API call response.""" http_response = urlopen(url) @@ -48,53 +44,48 @@ def api_root(url: str): return json.load(http_response)["data"] -class Config(NamedTuple): - """A structure representing a config API object.""" +def load_config_api(lang, program_id): + """Retrieve a program config from API.""" + url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}" + config = load_api_data(url) - provider_id: str - title: str - subtitle: str - versions: dict[str, tuple[str, str]] + if config["type"] != "ConfigPlayer": + raise ValueError("Invalid API response") - @classmethod - def load(cls, lang: str, provider_id: str) -> Config: - """Retrieve a stream config from API.""" - url = f"https://api.arte.tv/api/player/v2/config/{lang}/{provider_id}" - root = api_root(url) + if config["attributes"]["metadata"]["providerId"] != program_id: + raise ValueError("Invalid API response") - if root["type"] != "ConfigPlayer": - raise ValueError("API response not supported") + return config - attrs = root["attributes"] - if attrs["metadata"]["providerId"] != provider_id: - raise ValueError("API response not supported") - - return Config( - provider_id, - attrs["metadata"]["title"], - attrs["metadata"]["subtitle"], - { - s["versions"][0]["eStat"]["ml5"]: (s["versions"][0]["label"], s["url"]) - for s in attrs["streams"] - }, +def iter_versions(config): + """Return a (code, label, index_url) iterator.""" + for stream in config["attributes"]["streams"]: + yield ( + stream["versions"][0]["eStat"]["ml5"], # version code + stream["versions"][0]["label"], # version full name + stream["url"], # version index url ) - def url_for_version(self, version_code: str) -> str: - """Return the m3u8 url for the given version code.""" - if version_code not in self.versions: - print(f"Available versions:") - for code, (label, _) in self.versions.items(): - print(f"\t{code} - {label}") - exit(1) - return self.versions[version_code][1] +def find_version(config, version_code): + """Return the version index url for the given version code.""" + for (code, _, index_url) in iter_versions(config): + if code == version_code: + return index_url + + return None -def make_srt_tempfile(url): +def build_file_base_name(config): + """Create a base file name from config metadata.""" + return config["attributes"]["metadata"]["title"].replace("/", "-") + + +def make_srt_tempfile(subtitles_index_url): """Return a temporary file name where VTT subtitle has been downloaded/converted to SRT.""" - mpeg = m3u8.load(url) - urls = [cast(str, mpeg.base_uri) + "/" + f for f in mpeg.files] + subtitles_index = m3u8.load(subtitles_index_url) + urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files] if not urls: raise ValueError("No subtitle files") @@ -125,97 +116,100 @@ def make_srt_tempfile(url): return f.name -class Version(NamedTuple): - """A structure representing a version M3U8 object.""" +def load_version_index(url): + """Retrieve a version from m3u8 file.""" + version_index = m3u8.load(url) - videos: dict[str, str] - audio_url: str - subtitiles: Optional[tuple[str, str]] + if not version_index.playlists: + raise ValueError("Unexpected missing playlists") - @classmethod - def load(cls, url: str) -> Version: - """Retrieve a version from m3u8 file.""" - mpeg = m3u8.load(url) + for pl in version_index.playlists: + count = 0 + for m in pl.media: + if m.type == "AUDIO": + count += 1 + if count != 1: + raise ValueError("Unexpected missing or multiple audio tracks") - videos = { - str(pl.stream_info.resolution[1]): mpeg.base_uri + pl.uri - for pl in mpeg.playlists - } + count = 0 + for m in pl.media: + if m.type == "SUBTITLES": + count += 1 + if count > 1: + raise ValueError("Unexpected multiple subtitle tracks") - audios = [mpeg.base_uri + m.uri for m in mpeg.media if m.type == "AUDIO"] - if len(audios) != 1: - raise ValueError("Unexpected missing or multiple audio tracks.") - - subtitles = [ - (m.language, mpeg.base_uri + m.uri) - for m in mpeg.media - if m.type == "SUBTITLES" - ] - if len(subtitles) > 1: - raise ValueError("Unexpected multiple subtitles tracks.") - - return cls(videos, audios[0], subtitles[0] if subtitles else None) - - def download(self, resolution_code: str, file_base_name: str): - """Download a given resolution (video/audio/subtitles) and write it to an MKV container.""" - if resolution_code not in self.videos: - print(f"Available resolutions:") - for code in sorted(map(int, self.videos.keys()), reverse=True): - print(f"\t{code}") - exit(1) - - video_url = self.videos[resolution_code] - - if self.subtitiles: - srt_tempfile = make_srt_tempfile(self.subtitiles[1]) - subprocess.run( - [ - FFMPEG, - "-i", - srt_tempfile, - "-i", - video_url, - "-i", - self.audio_url, - "-c:v", - "copy", - "-c:a", - "copy", - "-bsf:a", - "aac_adtstoasc", - "-c:s", - "copy", - "-metadata:s:s:0", - f"language={self.subtitiles[0]}", - "-disposition:s:0", - "default", - f"{file_base_name}.mkv", - ] - ) - os.unlink(srt_tempfile) - else: - subprocess.run( - [ - FFMPEG, - "-i", - video_url, - "-i", - self.audio_url, - "-c:v", - "copy", - "-c:a", - "copy", - "-bsf:a", - "aac_adtstoasc", - f"{file_base_name}.mkv", - ] - ) + return version_index -def api_playlist(lang: str, provider_id: str): - """Retrieve a playlist from API.""" - url = f"https://api.arte.tv/api/player/v2/playlist/{lang}/{provider_id}" - raise NotImplementedError +def iter_resolutions(version_index): + """Iterate over resolution options.""" + for pl in sorted( + version_index.playlists, + key=lambda pl: pl.stream_info.resolution[1], + reverse=True, + ): + yield ( + # resolution code (1080p, 720p, ...) + f"{pl.stream_info.resolution[1]}p", + # resolution label + f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}", + ) + + +def find_resolution(version_index, resolution_code): + """Return the stream information for a given resolution_code.""" + for pl in version_index.playlists: + code = f"{pl.stream_info.resolution[1]}p" + if code != resolution_code: + continue + + audio_track = None + for m in pl.media: + if m.type == "AUDIO": + audio_track = (m.language, pl.base_uri + m.uri) + break + + subtitles_track = None + for m in pl.media: + if m.type == "SUBTITLES": + subtitles_track = (m.language, pl.base_uri + m.uri) + break + + return ( + pl.base_uri + pl.uri, + audio_track, + subtitles_track, + ) + + return None + + +def build_args(video_index_url, audio_track, subtitles_track, file_base_name): + """Build FFMPEG args.""" + audio_lang, audio_index_url = audio_track + if subtitles_track: + subtitles_lang, subtitles_file = subtitles_track + + args = [FFMPEG] + args.extend(["-i", video_index_url]) + args.extend(["-i", audio_index_url]) + if subtitles_track: + args.extend(["-i", subtitles_file]) + + args.extend(["-c:v", "copy"]) + args.extend(["-c:a", "copy"]) + if subtitles_track: + args.extend(["-c:s", "copy"]) + + args.extend(["-bsf:a", "aac_adtstoasc"]) + args.extend(["-metadata:s:a:0", f"language={audio_lang}"]) + + if subtitles_track: + args.extend(["-metadata:s:s:0", f"language={subtitles_lang}"]) + args.extend(["-disposition:s:0", "default"]) + + args.append(f"{file_base_name}.mkv") + return args def main(): @@ -227,14 +221,37 @@ def main(): if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos": raise ValueError("Invalid URL") - config = Config.load(ui_lang, stream_id) - version_url = config.url_for_version(version_code) + config = load_config_api(ui_lang, stream_id) - file_base_name = config.title.replace("/", "-") + version_index_url = find_version(config, version_code) + if version_index_url is None: + print(f"Available versions:", file=sys.stderr) + for (code, label, _) in iter_versions(config): + print(f"\t{code} - {label}", file=sys.stderr) + return 1 - version = Version.load(version_url) + version_index = load_version_index(version_index_url) - version.download(resolution_code, file_base_name) + stream_info = find_resolution(version_index, resolution_code) + if stream_info is None: + print(f"Available resolutions:", file=sys.stderr) + for code, label in iter_resolutions(version_index): + print(f"\t{code} - {label}", file=sys.stderr) + return 1 + + video_index_url, audio_track, subtitles_track = stream_info + if subtitles_track: + subtitles_lang, subtitles_index_url = subtitles_track + subtitle_file = make_srt_tempfile(subtitles_index_url) + subtitles_track = (subtitles_lang, subtitle_file) + + file_base_name = build_file_base_name(config) + + args = build_args(video_index_url, audio_track, subtitles_track, file_base_name) + + subprocess.run(args) + if subtitle_file: + os.unlink(subtitle_file) if __name__ == "__main__":