reflect readme terminology in code

This commit is contained in:
Etienne Zind 2022-12-08 01:20:23 +01:00
parent 7dba75faf3
commit 5125f1d6ba

View File

@ -10,8 +10,6 @@ Licence: GNU AGPL v3: http://www.gnu.org/licenses/
This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte) This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte)
""" """
from __future__ import annotations
import io import io
import json import json
import os import os
@ -21,18 +19,16 @@ import sys
import tempfile import tempfile
from http import HTTPStatus from http import HTTPStatus
from os import environ
from typing import NamedTuple, Optional, cast
from urllib.parse import urlparse from urllib.parse import urlparse
from urllib.request import urlopen from urllib.request import urlopen
import m3u8 import m3u8
import webvtt import webvtt
FFMPEG = environ.get("PATH_FFMPEG", "ffmpeg path not found") FFMPEG = os.environ.get("PATH_FFMPEG", "ffmpeg path not found")
def api_root(url: str): def load_api_data(url):
"""Retrieve the root node (infamous "data") of an API call response.""" """Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url) http_response = urlopen(url)
@ -48,53 +44,48 @@ def api_root(url: str):
return json.load(http_response)["data"] return json.load(http_response)["data"]
class Config(NamedTuple): def load_config_api(lang, program_id):
"""A structure representing a config API object.""" """Retrieve a program config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
config = load_api_data(url)
provider_id: str if config["type"] != "ConfigPlayer":
title: str raise ValueError("Invalid API response")
subtitle: str
versions: dict[str, tuple[str, str]]
@classmethod if config["attributes"]["metadata"]["providerId"] != program_id:
def load(cls, lang: str, provider_id: str) -> Config: raise ValueError("Invalid API response")
"""Retrieve a stream config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{provider_id}"
root = api_root(url)
if root["type"] != "ConfigPlayer": return config
raise ValueError("API response not supported")
attrs = root["attributes"]
if attrs["metadata"]["providerId"] != provider_id: def iter_versions(config):
raise ValueError("API response not supported") """Return a (code, label, index_url) iterator."""
for stream in config["attributes"]["streams"]:
return Config( yield (
provider_id, stream["versions"][0]["eStat"]["ml5"], # version code
attrs["metadata"]["title"], stream["versions"][0]["label"], # version full name
attrs["metadata"]["subtitle"], stream["url"], # version index url
{
s["versions"][0]["eStat"]["ml5"]: (s["versions"][0]["label"], s["url"])
for s in attrs["streams"]
},
) )
def url_for_version(self, version_code: str) -> str:
"""Return the m3u8 url for the given version code."""
if version_code not in self.versions:
print(f"Available versions:")
for code, (label, _) in self.versions.items():
print(f"\t{code} - {label}")
exit(1)
return self.versions[version_code][1] def find_version(config, version_code):
"""Return the version index url for the given version code."""
for (code, _, index_url) in iter_versions(config):
if code == version_code:
return index_url
return None
def make_srt_tempfile(url): def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def make_srt_tempfile(subtitles_index_url):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT.""" """Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
mpeg = m3u8.load(url) subtitles_index = m3u8.load(subtitles_index_url)
urls = [cast(str, mpeg.base_uri) + "/" + f for f in mpeg.files] urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls: if not urls:
raise ValueError("No subtitle files") raise ValueError("No subtitle files")
@ -125,97 +116,100 @@ def make_srt_tempfile(url):
return f.name return f.name
class Version(NamedTuple): def load_version_index(url):
"""A structure representing a version M3U8 object.""" """Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
videos: dict[str, str] if not version_index.playlists:
audio_url: str raise ValueError("Unexpected missing playlists")
subtitiles: Optional[tuple[str, str]]
@classmethod for pl in version_index.playlists:
def load(cls, url: str) -> Version: count = 0
"""Retrieve a version from m3u8 file.""" for m in pl.media:
mpeg = m3u8.load(url) if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
videos = { count = 0
str(pl.stream_info.resolution[1]): mpeg.base_uri + pl.uri for m in pl.media:
for pl in mpeg.playlists if m.type == "SUBTITLES":
} count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
audios = [mpeg.base_uri + m.uri for m in mpeg.media if m.type == "AUDIO"] return version_index
if len(audios) != 1:
raise ValueError("Unexpected missing or multiple audio tracks.")
subtitles = [
(m.language, mpeg.base_uri + m.uri)
for m in mpeg.media
if m.type == "SUBTITLES"
]
if len(subtitles) > 1:
raise ValueError("Unexpected multiple subtitles tracks.")
return cls(videos, audios[0], subtitles[0] if subtitles else None)
def download(self, resolution_code: str, file_base_name: str):
"""Download a given resolution (video/audio/subtitles) and write it to an MKV container."""
if resolution_code not in self.videos:
print(f"Available resolutions:")
for code in sorted(map(int, self.videos.keys()), reverse=True):
print(f"\t{code}")
exit(1)
video_url = self.videos[resolution_code]
if self.subtitiles:
srt_tempfile = make_srt_tempfile(self.subtitiles[1])
subprocess.run(
[
FFMPEG,
"-i",
srt_tempfile,
"-i",
video_url,
"-i",
self.audio_url,
"-c:v",
"copy",
"-c:a",
"copy",
"-bsf:a",
"aac_adtstoasc",
"-c:s",
"copy",
"-metadata:s:s:0",
f"language={self.subtitiles[0]}",
"-disposition:s:0",
"default",
f"{file_base_name}.mkv",
]
)
os.unlink(srt_tempfile)
else:
subprocess.run(
[
FFMPEG,
"-i",
video_url,
"-i",
self.audio_url,
"-c:v",
"copy",
"-c:a",
"copy",
"-bsf:a",
"aac_adtstoasc",
f"{file_base_name}.mkv",
]
)
def api_playlist(lang: str, provider_id: str): def iter_resolutions(version_index):
"""Retrieve a playlist from API.""" """Iterate over resolution options."""
url = f"https://api.arte.tv/api/player/v2/playlist/{lang}/{provider_id}" for pl in sorted(
raise NotImplementedError version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def find_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def build_args(video_index_url, audio_track, subtitles_track, file_base_name):
"""Build FFMPEG args."""
audio_lang, audio_index_url = audio_track
if subtitles_track:
subtitles_lang, subtitles_file = subtitles_track
args = [FFMPEG]
args.extend(["-i", video_index_url])
args.extend(["-i", audio_index_url])
if subtitles_track:
args.extend(["-i", subtitles_file])
args.extend(["-c:v", "copy"])
args.extend(["-c:a", "copy"])
if subtitles_track:
args.extend(["-c:s", "copy"])
args.extend(["-bsf:a", "aac_adtstoasc"])
args.extend(["-metadata:s:a:0", f"language={audio_lang}"])
if subtitles_track:
args.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
args.extend(["-disposition:s:0", "default"])
args.append(f"{file_base_name}.mkv")
return args
def main(): def main():
@ -227,14 +221,37 @@ def main():
if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos": if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos":
raise ValueError("Invalid URL") raise ValueError("Invalid URL")
config = Config.load(ui_lang, stream_id) config = load_config_api(ui_lang, stream_id)
version_url = config.url_for_version(version_code)
file_base_name = config.title.replace("/", "-") version_index_url = find_version(config, version_code)
if version_index_url is None:
print(f"Available versions:", file=sys.stderr)
for (code, label, _) in iter_versions(config):
print(f"\t{code} - {label}", file=sys.stderr)
return 1
version = Version.load(version_url) version_index = load_version_index(version_index_url)
version.download(resolution_code, file_base_name) stream_info = find_resolution(version_index, resolution_code)
if stream_info is None:
print(f"Available resolutions:", file=sys.stderr)
for code, label in iter_resolutions(version_index):
print(f"\t{code} - {label}", file=sys.stderr)
return 1
video_index_url, audio_track, subtitles_track = stream_info
if subtitles_track:
subtitles_lang, subtitles_index_url = subtitles_track
subtitle_file = make_srt_tempfile(subtitles_index_url)
subtitles_track = (subtitles_lang, subtitle_file)
file_base_name = build_file_base_name(config)
args = build_args(video_index_url, audio_track, subtitles_track, file_base_name)
subprocess.run(args)
if subtitle_file:
os.unlink(subtitle_file)
if __name__ == "__main__": if __name__ == "__main__":