delarte_test/delarte.py
2022-12-07 22:04:29 +01:00

242 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf8
"""delarte.
ArteTV downloader
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
This file is part of [`delarte`](https://gitlab.com/Barbagus/delarte)
"""
from __future__ import annotations
import io
import json
import os
import re
import subprocess
import sys
import tempfile
from http import HTTPStatus
from os import environ
from typing import NamedTuple, Optional, cast
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
FFMPEG = environ.get("PATH_FFMPEG", "ffmpeg path not found")
def api_root(url: str):
"""Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url)
if http_response.status != HTTPStatus.OK:
raise RuntimeError("API request failed")
if (
http_response.getheader("Content-Type")
!= "application/vnd.api+json; charset=utf-8"
):
raise ValueError("API response not supported")
return json.load(http_response)["data"]
class Config(NamedTuple):
"""A structure representing a config API object."""
provider_id: str
title: str
subtitle: str
versions: dict[str, tuple[str, str]]
@classmethod
def load(cls, lang: str, provider_id: str) -> Config:
"""Retrieve a stream config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{provider_id}"
root = api_root(url)
if root["type"] != "ConfigPlayer":
raise ValueError("API response not supported")
attrs = root["attributes"]
if attrs["metadata"]["providerId"] != provider_id:
raise ValueError("API response not supported")
return Config(
provider_id,
attrs["metadata"]["title"],
attrs["metadata"]["subtitle"],
{
s["versions"][0]["eStat"]["ml5"]: (s["versions"][0]["label"], s["url"])
for s in attrs["streams"]
},
)
def url_for_version(self, version_code: str) -> str:
"""Return the m3u8 url for the given version code."""
if version_code not in self.versions:
print(f"Available versions:")
for code, (label, _) in self.versions.items():
print(f"\t{code} - {label}")
exit(1)
return self.versions[version_code][1]
def make_srt_tempfile(url):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
mpeg = m3u8.load(url)
urls = [cast(str, mpeg.base_uri) + "/" + f for f in mpeg.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
with tempfile.NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
return f.name
class Version(NamedTuple):
"""A structure representing a version M3U8 object."""
videos: dict[str, str]
audio_url: str
subtitiles: Optional[tuple[str, str]]
@classmethod
def load(cls, url: str) -> Version:
"""Retrieve a version from m3u8 file."""
mpeg = m3u8.load(url)
videos = {
str(pl.stream_info.resolution[1]): mpeg.base_uri + pl.uri
for pl in mpeg.playlists
}
audios = [mpeg.base_uri + m.uri for m in mpeg.media if m.type == "AUDIO"]
if len(audios) != 1:
raise ValueError("Unexpected missing or multiple audio tracks.")
subtitles = [
(m.language, mpeg.base_uri + m.uri)
for m in mpeg.media
if m.type == "SUBTITLES"
]
if len(subtitles) > 1:
raise ValueError("Unexpected multiple subtitles tracks.")
return cls(videos, audios[0], subtitles[0] if subtitles else None)
def download(self, resolution_code: str, file_base_name: str):
"""Download a given resolution (video/audio/subtitles) and write it to an MKV container."""
if resolution_code not in self.videos:
print(f"Available resolutions:")
for code in sorted(map(int, self.videos.keys()), reverse=True):
print(f"\t{code}")
exit(1)
video_url = self.videos[resolution_code]
if self.subtitiles:
srt_tempfile = make_srt_tempfile(self.subtitiles[1])
subprocess.run(
[
FFMPEG,
"-i",
srt_tempfile,
"-i",
video_url,
"-i",
self.audio_url,
"-c:v",
"copy",
"-c:a",
"copy",
"-bsf:a",
"aac_adtstoasc",
"-c:s",
"copy",
"-metadata:s:s:0",
f"language={self.subtitiles[0]}",
"-disposition:s:0",
"default",
f"{file_base_name}.mkv",
]
)
os.unlink(srt_tempfile)
else:
subprocess.run(
[
FFMPEG,
"-i",
video_url,
"-i",
self.audio_url,
"-c:v",
"copy",
"-c:a",
"copy",
"-bsf:a",
"aac_adtstoasc",
f"{file_base_name}.mkv",
]
)
def api_playlist(lang: str, provider_id: str):
"""Retrieve a playlist from API."""
url = f"https://api.arte.tv/api/player/v2/playlist/{lang}/{provider_id}"
raise NotImplementedError
def main():
"""CLI function, options passed as arguments."""
(ui_lang, _, stream_id, _slug) = urlparse(sys.argv[1]).path[1:-1].split("/")
version_code = sys.argv[2] if len(sys.argv) > 2 else ""
resolution_code = sys.argv[3] if len(sys.argv) > 3 else ""
if ui_lang not in ("fr", "de", "en", "es", "pl", "it") or _ != "videos":
raise ValueError("Invalid URL")
config = Config.load(ui_lang, stream_id)
version_url = config.url_for_version(version_code)
file_base_name = config.title.replace("/", "-")
version = Version.load(version_url)
version.download(resolution_code, file_base_name)
if __name__ == "__main__":
sys.exit(main())