forked from fcode/delarte
206 lines
5.7 KiB
Python
206 lines
5.7 KiB
Python
"""delarte.
|
|
|
|
ArteTV downloader
|
|
|
|
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
|
|
|
|
This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
|
|
"""
|
|
__version__ = "0.1"
|
|
|
|
import io
|
|
import json
|
|
import re
|
|
import tempfile
|
|
|
|
from http import HTTPStatus
|
|
from urllib.request import urlopen
|
|
|
|
import m3u8
|
|
import webvtt
|
|
|
|
|
|
def load_api_data(url):
|
|
"""Retrieve the root node (infamous "data") of an API call response."""
|
|
http_response = urlopen(url)
|
|
|
|
if http_response.status != HTTPStatus.OK:
|
|
raise RuntimeError("API request failed")
|
|
|
|
if (
|
|
http_response.getheader("Content-Type")
|
|
!= "application/vnd.api+json; charset=utf-8"
|
|
):
|
|
raise ValueError("API response not supported")
|
|
|
|
return json.load(http_response)["data"]
|
|
|
|
|
|
def load_config_api(lang, program_id):
|
|
"""Retrieve a program config from API."""
|
|
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
|
|
config = load_api_data(url)
|
|
|
|
if config["type"] != "ConfigPlayer":
|
|
raise ValueError("Invalid API response")
|
|
|
|
if config["attributes"]["metadata"]["providerId"] != program_id:
|
|
raise ValueError("Invalid API response")
|
|
|
|
return config
|
|
|
|
|
|
def iter_versions(config):
|
|
"""Return a (code, label, index_url) iterator."""
|
|
for stream in config["attributes"]["streams"]:
|
|
yield (
|
|
# version code
|
|
stream["versions"][0]["eStat"]["ml5"],
|
|
# version full name
|
|
stream["versions"][0]["label"],
|
|
)
|
|
|
|
|
|
def select_version(config, version_code):
|
|
"""Return the version index url for the given version code."""
|
|
for stream in config["attributes"]["streams"]:
|
|
if stream["versions"][0]["eStat"]["ml5"] == version_code:
|
|
return stream["url"]
|
|
|
|
return None
|
|
|
|
|
|
def build_file_base_name(config):
|
|
"""Create a base file name from config metadata."""
|
|
return config["attributes"]["metadata"]["title"].replace("/", "-")
|
|
|
|
|
|
def make_srt_tempfile(subtitles_index_url):
|
|
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
|
|
subtitles_index = m3u8.load(subtitles_index_url)
|
|
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
|
|
|
|
if not urls:
|
|
raise ValueError("No subtitle files")
|
|
|
|
if len(urls) > 1:
|
|
raise ValueError("Multiple subtitle files")
|
|
|
|
http_response = urlopen(urls[0])
|
|
if http_response.status != HTTPStatus.OK:
|
|
raise RuntimeError("Subtitle request failed")
|
|
|
|
buffer = io.StringIO(http_response.read().decode("utf8"))
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
|
|
) as f:
|
|
i = 1
|
|
for caption in webvtt.read_buffer(buffer):
|
|
print(i, file=f)
|
|
print(
|
|
re.sub(r"\.", ",", caption.start)
|
|
+ " --> "
|
|
+ re.sub(r"\.", ",", caption.end),
|
|
file=f,
|
|
)
|
|
print(caption.text + "\n", file=f)
|
|
i += 1
|
|
return f.name
|
|
|
|
|
|
def load_version_index(url):
|
|
"""Retrieve a version from m3u8 file."""
|
|
version_index = m3u8.load(url)
|
|
|
|
if not version_index.playlists:
|
|
raise ValueError("Unexpected missing playlists")
|
|
|
|
for pl in version_index.playlists:
|
|
count = 0
|
|
for m in pl.media:
|
|
if m.type == "AUDIO":
|
|
count += 1
|
|
if count != 1:
|
|
raise ValueError("Unexpected missing or multiple audio tracks")
|
|
|
|
count = 0
|
|
for m in pl.media:
|
|
if m.type == "SUBTITLES":
|
|
count += 1
|
|
if count > 1:
|
|
raise ValueError("Unexpected multiple subtitle tracks")
|
|
|
|
return version_index
|
|
|
|
|
|
def iter_resolutions(version_index):
|
|
"""Iterate over resolution options."""
|
|
for pl in sorted(
|
|
version_index.playlists,
|
|
key=lambda pl: pl.stream_info.resolution[1],
|
|
reverse=True,
|
|
):
|
|
yield (
|
|
# resolution code (1080p, 720p, ...)
|
|
f"{pl.stream_info.resolution[1]}p",
|
|
# resolution label
|
|
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
|
|
)
|
|
|
|
|
|
def select_resolution(version_index, resolution_code):
|
|
"""Return the stream information for a given resolution_code."""
|
|
for pl in version_index.playlists:
|
|
code = f"{pl.stream_info.resolution[1]}p"
|
|
if code != resolution_code:
|
|
continue
|
|
|
|
audio_track = None
|
|
for m in pl.media:
|
|
if m.type == "AUDIO":
|
|
audio_track = (m.language, pl.base_uri + m.uri)
|
|
break
|
|
|
|
subtitles_track = None
|
|
for m in pl.media:
|
|
if m.type == "SUBTITLES":
|
|
subtitles_track = (m.language, pl.base_uri + m.uri)
|
|
break
|
|
|
|
return (
|
|
pl.base_uri + pl.uri,
|
|
audio_track,
|
|
subtitles_track,
|
|
)
|
|
|
|
return None
|
|
|
|
|
|
def build_ffmpeg_cmd(video_index_url, audio_track, subtitles_track, file_base_name):
|
|
"""Build FFMPEG args."""
|
|
audio_lang, audio_index_url = audio_track
|
|
if subtitles_track:
|
|
subtitles_lang, subtitles_file = subtitles_track
|
|
|
|
cmd = ["ffmpeg"]
|
|
cmd.extend(["-i", video_index_url])
|
|
cmd.extend(["-i", audio_index_url])
|
|
if subtitles_track:
|
|
cmd.extend(["-i", subtitles_file])
|
|
|
|
cmd.extend(["-c:v", "copy"])
|
|
cmd.extend(["-c:a", "copy"])
|
|
if subtitles_track:
|
|
cmd.extend(["-c:s", "copy"])
|
|
|
|
cmd.extend(["-bsf:a", "aac_adtstoasc"])
|
|
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
|
|
|
|
if subtitles_track:
|
|
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
|
|
cmd.extend(["-disposition:s:0", "default"])
|
|
|
|
cmd.append(f"{file_base_name}.mkv")
|
|
return cmd
|