delarte_test/src/delarte/__init__.py
Barbagus be4363a339 Fix issue #6 on FFMPEG header error
Handle the audio and video channel downloading to temporary files prior
to calling ffmpeg.

Although it might not be necessary, the download is made by "chunks" as
it would be by a client/player.

Downloading progress feedback is printed to the terminal.
2022-12-11 16:09:11 +01:00

339 lines
10 KiB
Python

"""delarte.
ArteTV downloader
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""
__version__ = "0.1"
import contextlib
import io
import json
import os
import re
from http import HTTPStatus
from http.client import HTTPSConnection, HTTPConnection
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
def load_api_data(url):
"""Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url)
if http_response.status != HTTPStatus.OK:
raise RuntimeError("API request failed")
if (
http_response.getheader("Content-Type")
!= "application/vnd.api+json; charset=utf-8"
):
raise ValueError("API response not supported")
return json.load(http_response)["data"]
def load_config_api(lang, program_id):
"""Retrieve a program config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
config = load_api_data(url)
if config["type"] != "ConfigPlayer":
raise ValueError("Invalid API response")
if config["attributes"]["metadata"]["providerId"] != program_id:
raise ValueError("Invalid API response")
return config
def iter_versions(config):
"""Return a (code, label, index_url) iterator."""
for stream in config["attributes"]["streams"]:
yield (
# version code
stream["versions"][0]["eStat"]["ml5"],
# version full name
stream["versions"][0]["label"],
)
def select_version(config, version_code):
"""Return the version index url for the given version code."""
for stream in config["attributes"]["streams"]:
if stream["versions"][0]["eStat"]["ml5"] == version_code:
return stream["url"]
return None
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def download_subtitles_input(index_url, progress):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
subtitles_index = m3u8.load(index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
progress(0, 2)
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
def load_version_index(url):
"""Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
if not version_index.playlists:
raise ValueError("Unexpected missing playlists")
for pl in version_index.playlists:
count = 0
for m in pl.media:
if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
count = 0
for m in pl.media:
if m.type == "SUBTITLES":
count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
return version_index
def iter_resolutions(version_index):
"""Iterate over resolution options."""
for pl in sorted(
version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def select_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def build_ffmpeg_cmd(inputs, file_base_name):
"""Build FFMPEG args."""
video_input, audio_track, subtitles_track = inputs
audio_lang, audio_input = audio_track
if subtitles_track:
subtitles_lang, subtitles_input = subtitles_track
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", video_input])
cmd.extend(["-i", audio_input])
if subtitles_track:
cmd.extend(["-i", subtitles_input])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if subtitles_track:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
if subtitles_track:
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")
return cmd
def parse_byterange(obj):
"""Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)."""
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def load_av_index(index_url):
"""Load a M3U8 audio or video index."""
index = m3u8.load(index_url)
file_name = index.segment_map[0].uri
range_start, range_end = parse_byterange(index.segment_map[0])
if range_start != 0:
raise ValueError("Invalid a/v index: does not start at 0")
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in index.segments:
if segment.uri != file_name:
raise ValueError("Invalid a/v index: multiple file names")
range_start, range_end = parse_byterange(segment)
if range_start != total:
raise ValueError(
f"Invalid a/v index: discontious ranges ({range_start} != {total})"
)
chunks.append((range_start, range_end))
total = range_end + 1
return urlparse(index.segment_map[0].absolute_uri), chunks
def download_av_input(index_url, progress):
"""Download an audio or video stream to temporary directory."""
url, ranges = load_av_index(index_url)
total = ranges[-1][1]
Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection
connection = Connector(url.hostname)
connection.connect()
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f,
contextlib.closing(connection) as c,
):
for range_start, range_end in ranges:
c.request(
"GET",
url.path,
headers={
"Accept": "*/*",
"Accept-Language": "fr,en;q=0.7,en-US;q=0.3",
"Accept-Encoding": "gzip, deflate, br, identity",
"Range": f"bytes={range_start}-{range_end}",
"Origin": "https://www.arte.tv",
"Connection": "keep-alive",
"Referer": "https://www.arte.tv/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Sec-GPC": "1",
"DNT": "1",
},
)
r = c.getresponse()
if r.status != 206:
raise ValueError(f"Invalid response status {r.status}")
content = r.read()
if len(content) != range_end - range_start + 1:
raise ValueError("Invalid range length")
f.write(content)
progress(range_end, total)
return f.name
@contextlib.contextmanager
def download_inputs(remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = download_av_input(
video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = download_av_input(
audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = download_subtitles_input(
subtitles_index_url, lambda i, n: progress("subtitles", i, n)
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)