242 lines
6.9 KiB
Python
Executable File
242 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# coding: utf8
|
|
|
|
"""delarte.
|
|
|
|
ArteTV downloader
|
|
|
|
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
|
|
|
|
This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
from http import HTTPStatus
|
|
from os import environ
|
|
from typing import NamedTuple, Optional, cast
|
|
from urllib.parse import urlparse
|
|
from urllib.request import urlopen
|
|
|
|
import m3u8
|
|
import webvtt
|
|
|
|
FFMPEG = environ.get("PATH_FFMPEG", "ffmpeg path not found")
|
|
|
|
|
|
def api_root(url: str):
|
|
"""Retrieve the root node (infamous "data") of an API call response."""
|
|
http_response = urlopen(url)
|
|
|
|
if http_response.status != HTTPStatus.OK:
|
|
raise RuntimeError("API request failed")
|
|
|
|
if (
|
|
http_response.getheader("Content-Type")
|
|
!= "application/vnd.api+json; charset=utf-8"
|
|
):
|
|
raise ValueError("API response not supported")
|
|
|
|
return json.load(http_response)["data"]
|
|
|
|
|
|
class Config(NamedTuple):
|
|
"""A structure representing a config API object."""
|
|
|
|
provider_id: str
|
|
title: str
|
|
subtitle: str
|
|
versions: dict[str, tuple[str, str]]
|
|
|
|
@classmethod
|
|
def load(cls, lang: str, provider_id: str) -> Config:
|
|
"""Retrieve a stream config from API."""
|
|
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{provider_id}"
|
|
root = api_root(url)
|
|
|
|
if root["type"] != "ConfigPlayer":
|
|
raise ValueError("API response not supported")
|
|
|
|
attrs = root["attributes"]
|
|
|
|
if attrs["metadata"]["providerId"] != provider_id:
|
|
raise ValueError("API response not supported")
|
|
|
|
return Config(
|
|
provider_id,
|
|
attrs["metadata"]["title"],
|
|
attrs["metadata"]["subtitle"],
|
|
{
|
|
s["versions"][0]["eStat"]["ml5"]: (s["versions"][0]["label"], s["url"])
|
|
for s in attrs["streams"]
|
|
},
|
|
)
|
|
|
|
def url_for_version(self, version_code: str) -> str:
|
|
"""Return the m3u8 url for the given version code."""
|
|
if version_code not in self.versions:
|
|
print(f"Available versions:")
|
|
for code, (label, _) in self.versions.items():
|
|
print(f"\t{code} - {label}")
|
|
exit(1)
|
|
|
|
return self.versions[version_code][1]
|
|
|
|
|
|
def make_srt_tempfile(url):
|
|
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
|
|
mpeg = m3u8.load(url)
|
|
urls = [cast(str, mpeg.base_uri) + "/" + f for f in mpeg.files]
|
|
|
|
if not urls:
|
|
raise ValueError("No subtitle files")
|
|
|
|
if len(urls) > 1:
|
|
raise ValueError("Multiple subtitle files")
|
|
|
|
http_response = urlopen(urls[0])
|
|
if http_response.status != HTTPStatus.OK:
|
|
raise RuntimeError("Subtitle request failed")
|
|
|
|
buffer = io.StringIO(http_response.read().decode("utf8"))
|
|
|
|
with tempfile.NamedTemporaryFile(
|
|
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
|
|
) as f:
|
|
i = 1
|
|
for caption in webvtt.read_buffer(buffer):
|
|
print(i, file=f)
|
|
print(
|
|
re.sub(r"\.", ",", caption.start)
|
|
+ " --> "
|
|
+ re.sub(r"\.", ",", caption.end),
|
|
file=f,
|
|
)
|
|
print(caption.text + "\n", file=f)
|
|
i += 1
|
|
return f.name
|
|
|
|
|
|
class Version(NamedTuple):
|
|
"""A structure representing a version M3U8 object."""
|
|
|
|
videos: dict[str, str]
|
|
audio_url: str
|
|
subtitiles: Optional[tuple[str, str]]
|
|
|
|
@classmethod
|
|
def load(cls, url: str) -> Version:
|
|
"""Retrieve a version from m3u8 file."""
|
|
mpeg = m3u8.load(url)
|
|
|
|
videos = {
|
|
str(pl.stream_info.resolution[1]): mpeg.base_uri + pl.uri
|
|
for pl in mpeg.playlists
|
|
}
|
|
|
|
audios = [mpeg.base_uri + m.uri for m in mpeg.media if m.type == "AUDIO"]
|
|
if len(audios) != 1:
|
|
raise ValueError("Unexpected missing or multiple audio tracks.")
|
|
|
|
subtitles = [
|
|
(m.language, mpeg.base_uri + m.uri)
|
|
for m in mpeg.media
|
|
if m.type == "SUBTITLES"
|
|
]
|
|
if len(subtitles) > 1:
|
|
raise ValueError("Unexpected multiple subtitles tracks.")
|
|
|
|
return cls(videos, audios[0], subtitles[0] if subtitles else None)
|
|
|
|
def download(self, resolution_code: str, file_base_name: str):
|
|
"""Download a given resolution (video/audio/subtitles) and write it to an MKV container."""
|
|
if resolution_code not in self.videos:
|
|
print(f"Available resolutions:")
|
|
for code in sorted(map(int, self.videos.keys()), reverse=True):
|
|
print(f"\t{code}")
|
|
exit(1)
|
|
|
|
video_url = self.videos[resolution_code]
|
|
|
|
if self.subtitiles:
|
|
srt_tempfile = make_srt_tempfile(self.subtitiles[1])
|
|
subprocess.run(
|
|
[
|
|
FFMPEG,
|
|
"-i",
|
|
srt_tempfile,
|
|
"-i",
|
|
video_url,
|
|
"-i",
|
|
self.audio_url,
|
|
"-c:v",
|
|
"copy",
|
|
"-c:a",
|
|
"copy",
|
|
"-bsf:a",
|
|
"aac_adtstoasc",
|
|
"-c:s",
|
|
"copy",
|
|
"-metadata:s:s:0",
|
|
f"language={self.subtitiles[0]}",
|
|
"-disposition:s:0",
|
|
"default",
|
|
f"{file_base_name}.mkv",
|
|
]
|
|
)
|
|
os.unlink(srt_tempfile)
|
|
else:
|
|
subprocess.run(
|
|
[
|
|
FFMPEG,
|
|
"-i",
|
|
video_url,
|
|
"-i",
|
|
self.audio_url,
|
|
"-c:v",
|
|
"copy",
|
|
"-c:a",
|
|
"copy",
|
|
"-bsf:a",
|
|
"aac_adtstoasc",
|
|
f"{file_base_name}.mkv",
|
|
]
|
|
)
|
|
|
|
|
|
def api_playlist(lang: str, provider_id: str):
|
|
"""Retrieve a playlist from API."""
|
|
url = f"https://api.arte.tv/api/player/v2/playlist/{lang}/{provider_id}"
|
|
raise NotImplementedError
|
|
|
|
|
|
def main():
|
|
"""CLI function, options passed as arguments."""
|
|
(ui_lang, _, stream_id, _slug) = urlparse(sys.argv[1]).path[1:-1].split("/")
|
|
version_code = sys.argv[2] if len(sys.argv) > 2 else ""
|
|
resolution_code = sys.argv[3] if len(sys.argv) > 3 else ""
|
|
|
|
if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos":
|
|
raise ValueError("Invalid URL")
|
|
|
|
config = Config.load(ui_lang, stream_id)
|
|
version_url = config.url_for_version(version_code)
|
|
|
|
file_base_name = config.title.replace("/", "-")
|
|
|
|
version = Version.load(version_url)
|
|
|
|
version.download(resolution_code, file_base_name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|