delarte_test/src/delarte/__init__.py

253 lines
7.3 KiB
Python

"""delarte.
ArteTV downloader
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte)
"""
__version__ = "0.1"
import io
import json
import os
import re
import subprocess
import sys
import tempfile
from http import HTTPStatus
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
FFMPEG = os.environ.get("PATH_FFMPEG", "ffmpeg path not found")
def load_api_data(url):
"""Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url)
if http_response.status != HTTPStatus.OK:
raise RuntimeError("API request failed")
if (
http_response.getheader("Content-Type")
!= "application/vnd.api+json; charset=utf-8"
):
raise ValueError("API response not supported")
return json.load(http_response)["data"]
def load_config_api(lang, program_id):
"""Retrieve a program config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
config = load_api_data(url)
if config["type"] != "ConfigPlayer":
raise ValueError("Invalid API response")
if config["attributes"]["metadata"]["providerId"] != program_id:
raise ValueError("Invalid API response")
return config
def iter_versions(config):
"""Return a (code, label, index_url) iterator."""
for stream in config["attributes"]["streams"]:
yield (
stream["versions"][0]["eStat"]["ml5"], # version code
stream["versions"][0]["label"], # version full name
stream["url"], # version index url
)
def find_version(config, version_code):
"""Return the version index url for the given version code."""
for (code, _, index_url) in iter_versions(config):
if code == version_code:
return index_url
return None
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def make_srt_tempfile(subtitles_index_url):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
subtitles_index = m3u8.load(subtitles_index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
with tempfile.NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
return f.name
def load_version_index(url):
"""Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
if not version_index.playlists:
raise ValueError("Unexpected missing playlists")
for pl in version_index.playlists:
count = 0
for m in pl.media:
if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
count = 0
for m in pl.media:
if m.type == "SUBTITLES":
count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
return version_index
def iter_resolutions(version_index):
"""Iterate over resolution options."""
for pl in sorted(
version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def find_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def build_args(video_index_url, audio_track, subtitles_track, file_base_name):
"""Build FFMPEG args."""
audio_lang, audio_index_url = audio_track
if subtitles_track:
subtitles_lang, subtitles_file = subtitles_track
args = ["ffmpeg"]
args.extend(["-i", video_index_url])
args.extend(["-i", audio_index_url])
if subtitles_track:
args.extend(["-i", subtitles_file])
args.extend(["-c:v", "copy"])
args.extend(["-c:a", "copy"])
if subtitles_track:
args.extend(["-c:s", "copy"])
args.extend(["-bsf:a", "aac_adtstoasc"])
args.extend(["-metadata:s:a:0", f"language={audio_lang}"])
if subtitles_track:
args.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
args.extend(["-disposition:s:0", "default"])
args.append(f"{file_base_name}.mkv")
return args
def main():
"""CLI function, options passed as arguments."""
(ui_lang, _, stream_id, _slug) = urlparse(sys.argv[1]).path[1:-1].split("/")
version_code = sys.argv[2] if len(sys.argv) > 2 else ""
resolution_code = sys.argv[3] if len(sys.argv) > 3 else ""
if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos":
raise ValueError("Invalid URL")
config = load_config_api(ui_lang, stream_id)
version_index_url = find_version(config, version_code)
if version_index_url is None:
print(f"Available versions:", file=sys.stderr)
for (code, label, _) in iter_versions(config):
print(f"\t{code} - {label}", file=sys.stderr)
return 1
version_index = load_version_index(version_index_url)
stream_info = find_resolution(version_index, resolution_code)
if stream_info is None:
print(f"Available resolutions:", file=sys.stderr)
for code, label in iter_resolutions(version_index):
print(f"\t{code} - {label}", file=sys.stderr)
return 1
video_index_url, audio_track, subtitles_track = stream_info
if subtitles_track:
subtitles_lang, subtitles_index_url = subtitles_track
subtitle_file = make_srt_tempfile(subtitles_index_url)
subtitles_track = (subtitles_lang, subtitle_file)
file_base_name = build_file_base_name(config)
args = build_args(video_index_url, audio_track, subtitles_track, file_base_name)
subprocess.run(args)
if subtitle_file:
os.unlink(subtitle_file)