diff --git a/src/delarte/__init__.py b/src/delarte/__init__.py index a0e28e9..af241db 100644 --- a/src/delarte/__init__.py +++ b/src/delarte/__init__.py @@ -1,338 +1,6 @@ -"""delarte. +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) -ArteTV downloader +"""delarte - ArteTV downloader.""" -Licence: GNU AGPL v3: http://www.gnu.org/licenses/ - -This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) -""" __version__ = "0.1" - -import contextlib -import io -import json -import os -import re - -from http import HTTPStatus -from http.client import HTTPSConnection, HTTPConnection -from tempfile import NamedTemporaryFile -from urllib.parse import urlparse -from urllib.request import urlopen - -import m3u8 -import webvtt - - -def load_api_data(url): - """Retrieve the root node (infamous "data") of an API call response.""" - http_response = urlopen(url) - - if http_response.status != HTTPStatus.OK: - raise RuntimeError("API request failed") - - if ( - http_response.getheader("Content-Type") - != "application/vnd.api+json; charset=utf-8" - ): - raise ValueError("API response not supported") - - return json.load(http_response)["data"] - - -def load_config_api(lang, program_id): - """Retrieve a program config from API.""" - url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}" - config = load_api_data(url) - - if config["type"] != "ConfigPlayer": - raise ValueError("Invalid API response") - - if config["attributes"]["metadata"]["providerId"] != program_id: - raise ValueError("Invalid API response") - - return config - - -def iter_versions(config): - """Return a (code, label, index_url) iterator.""" - for stream in config["attributes"]["streams"]: - yield ( - # version code - stream["versions"][0]["eStat"]["ml5"], - # version full name - stream["versions"][0]["label"], - ) - - -def select_version(config, version_code): - """Return the version index url for the given version code.""" - for stream in config["attributes"]["streams"]: - if stream["versions"][0]["eStat"]["ml5"] == version_code: - return stream["url"] - - return None - - -def build_file_base_name(config): - """Create a base file name from config metadata.""" - return config["attributes"]["metadata"]["title"].replace("/", "-") - - -def download_subtitles_input(index_url, progress): - """Return a temporary file name where VTT subtitle has been downloaded/converted to SRT.""" - subtitles_index = m3u8.load(index_url) - urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files] - - if not urls: - raise ValueError("No subtitle files") - - if len(urls) > 1: - raise ValueError("Multiple subtitle files") - - progress(0, 2) - http_response = urlopen(urls[0]) - if http_response.status != HTTPStatus.OK: - raise RuntimeError("Subtitle request failed") - - buffer = io.StringIO(http_response.read().decode("utf8")) - progress(1, 2) - - with NamedTemporaryFile( - "w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8" - ) as f: - i = 1 - for caption in webvtt.read_buffer(buffer): - print(i, file=f) - print( - re.sub(r"\.", ",", caption.start) - + " --> " - + re.sub(r"\.", ",", caption.end), - file=f, - ) - print(caption.text + "\n", file=f) - i += 1 - progress(2, 2) - return f.name - - -def load_version_index(url): - """Retrieve a version from m3u8 file.""" - version_index = m3u8.load(url) - - if not version_index.playlists: - raise ValueError("Unexpected missing playlists") - - for pl in version_index.playlists: - count = 0 - for m in pl.media: - if m.type == "AUDIO": - count += 1 - if count != 1: - raise ValueError("Unexpected missing or multiple audio tracks") - - count = 0 - for m in pl.media: - if m.type == "SUBTITLES": - count += 1 - if count > 1: - raise ValueError("Unexpected multiple subtitle tracks") - - return version_index - - -def iter_resolutions(version_index): - """Iterate over resolution options.""" - for pl in sorted( - version_index.playlists, - key=lambda pl: pl.stream_info.resolution[1], - reverse=True, - ): - yield ( - # resolution code (1080p, 720p, ...) - f"{pl.stream_info.resolution[1]}p", - # resolution label - f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}", - ) - - -def select_resolution(version_index, resolution_code): - """Return the stream information for a given resolution_code.""" - for pl in version_index.playlists: - code = f"{pl.stream_info.resolution[1]}p" - if code != resolution_code: - continue - - audio_track = None - for m in pl.media: - if m.type == "AUDIO": - audio_track = (m.language, pl.base_uri + m.uri) - break - - subtitles_track = None - for m in pl.media: - if m.type == "SUBTITLES": - subtitles_track = (m.language, pl.base_uri + m.uri) - break - - return ( - pl.base_uri + pl.uri, - audio_track, - subtitles_track, - ) - - return None - - -def build_ffmpeg_cmd(inputs, file_base_name): - """Build FFMPEG args.""" - video_input, audio_track, subtitles_track = inputs - - audio_lang, audio_input = audio_track - if subtitles_track: - subtitles_lang, subtitles_input = subtitles_track - - cmd = ["ffmpeg", "-hide_banner"] - cmd.extend(["-i", video_input]) - cmd.extend(["-i", audio_input]) - if subtitles_track: - cmd.extend(["-i", subtitles_input]) - - cmd.extend(["-c:v", "copy"]) - cmd.extend(["-c:a", "copy"]) - if subtitles_track: - cmd.extend(["-c:s", "copy"]) - - cmd.extend(["-bsf:a", "aac_adtstoasc"]) - cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"]) - - if subtitles_track: - cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"]) - cmd.extend(["-disposition:s:0", "default"]) - - cmd.append(f"{file_base_name}.mkv") - return cmd - - -def parse_byterange(obj): - """Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end).""" - count, offset = [int(v) for v in obj.byterange.split("@")] - return offset, offset + count - 1 - - -def load_av_index(index_url): - """Load a M3U8 audio or video index.""" - index = m3u8.load(index_url) - - file_name = index.segment_map[0].uri - range_start, range_end = parse_byterange(index.segment_map[0]) - if range_start != 0: - raise ValueError("Invalid a/v index: does not start at 0") - chunks = [(range_start, range_end)] - total = range_end + 1 - - for segment in index.segments: - if segment.uri != file_name: - raise ValueError("Invalid a/v index: multiple file names") - - range_start, range_end = parse_byterange(segment) - if range_start != total: - raise ValueError( - f"Invalid a/v index: discontious ranges ({range_start} != {total})" - ) - - chunks.append((range_start, range_end)) - total = range_end + 1 - - return urlparse(index.segment_map[0].absolute_uri), chunks - - -def download_av_input(index_url, progress): - """Download an audio or video stream to temporary directory.""" - url, ranges = load_av_index(index_url) - total = ranges[-1][1] - - Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection - connection = Connector(url.hostname) - connection.connect() - - with ( - NamedTemporaryFile( - mode="w+b", delete=False, prefix="delarte.", suffix=".mp4" - ) as f, - contextlib.closing(connection) as c, - ): - for range_start, range_end in ranges: - c.request( - "GET", - url.path, - headers={ - "Accept": "*/*", - "Accept-Language": "fr,en;q=0.7,en-US;q=0.3", - "Accept-Encoding": "gzip, deflate, br, identity", - "Range": f"bytes={range_start}-{range_end}", - "Origin": "https://www.arte.tv", - "Connection": "keep-alive", - "Referer": "https://www.arte.tv/", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "cross-site", - "Sec-GPC": "1", - "DNT": "1", - }, - ) - r = c.getresponse() - if r.status != 206: - raise ValueError(f"Invalid response status {r.status}") - - content = r.read() - if len(content) != range_end - range_start + 1: - raise ValueError("Invalid range length") - f.write(content) - - progress(range_end, total) - - return f.name - - -@contextlib.contextmanager -def download_inputs(remote_inputs, progress): - """Download inputs in temporary files.""" - # It is implemented as a context manager that will delete temporary files on exit. - - video_index_url, audio_track, subtitles_track = remote_inputs - - video_filename = None - audio_filename = None - subtitles_filename = None - - try: - video_filename = download_av_input( - video_index_url, lambda i, n: progress("video", i, n) - ) - - (audio_lang, audio_index_url) = audio_track - audio_filename = download_av_input( - audio_index_url, lambda i, n: progress("audio", i, n) - ) - - if subtitles_track: - (subtitles_lang, subtitles_index_url) = subtitles_track - subtitles_filename = download_subtitles_input( - subtitles_index_url, lambda i, n: progress("subtitles", i, n) - ) - - yield ( - video_filename, - (audio_lang, audio_filename), - (subtitles_lang, subtitles_filename), - ) - else: - yield (video_filename, (audio_lang, audio_filename), None) - finally: - if video_filename and os.path.isfile(video_filename): - os.unlink(video_filename) - if audio_filename and os.path.isfile(audio_filename): - os.unlink(audio_filename) - if subtitles_filename and os.path.isfile(subtitles_filename): - os.unlink(subtitles_filename) diff --git a/src/delarte/__main__.py b/src/delarte/__main__.py index fb248c9..66005c5 100644 --- a/src/delarte/__main__.py +++ b/src/delarte/__main__.py @@ -1,27 +1,22 @@ -"""ArteTV dowloader. +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""delarte - ArteTV dowloader. usage: delarte [-h|--help] - print this message or: delarte program_page_url - show available versions or: delarte program_page_url version - show available resolutions or: delarte program_page_url version resolution - download the given video """ -import subprocess + import sys import time -from urllib.parse import urlparse - -from . import ( - build_ffmpeg_cmd, - build_file_base_name, - download_inputs, - select_resolution, - select_version, - iter_resolutions, - iter_versions, - load_config_api, - load_version_index, -) +from . import api +from . import hls +from . import muxing +from . import naming +from . import www def fail(message, code=1): @@ -33,14 +28,14 @@ def fail(message, code=1): def print_available_versions(config, f): """Print available program versions.""" print(f"Available versions:", file=f) - for code, label in iter_versions(config): + for code, label in api.iter_versions(config): print(f"\t{code} - {label}", file=f) def print_available_resolutions(version_index, f): """Print available version resolutions.""" print(f"Available resolutions:", file=f) - for code, label in iter_resolutions(version_index): + for code, label in hls.iter_resolutions(version_index): print(f"\t{code} - {label}", file=f) @@ -79,27 +74,12 @@ def main(): return 0 try: - program_page_url = urlparse(args.pop(0)) - if program_page_url.hostname != "www.arte.tv": - return fail("Not an ArteTV url") - - program_page_path = program_page_url.path.split("/")[1:] - - ui_language = program_page_path.pop(0) - - if ui_language not in ("fr", "de", "en", "es", "pl", "it"): - return fail(f"Invalid url language code: {ui_language}") - - if program_page_path.pop(0) != "videos": - return fail("Invalid ArteTV url") - - program_id = program_page_path.pop(0) - - except ValueError: - return fail("Invalid url") + www_lang, program_id = www.parse_url(args.pop(0)) + except ValueError as e: + return fail(f"Invalid url: {e}") try: - config = load_config_api(ui_language, program_id) + config = api.load_config(www_lang, program_id) except ValueError: return fail("Invalid program") @@ -107,29 +87,30 @@ def main(): print_available_versions(config, sys.stdout) return 0 - version_index_url = select_version(config, args.pop(0)) + version_index_url = api.select_version(config, args.pop(0)) if version_index_url is None: fail("Invalid version") print_available_versions(config, sys.stderr) return 1 - version_index = load_version_index(version_index_url) + version_index = hls.load_version_index(version_index_url) if not args: print_available_resolutions(version_index, sys.stdout) return 0 - remote_inputs = select_resolution(version_index, args.pop(0)) + remote_inputs = hls.select_resolution(version_index, args.pop(0)) if remote_inputs is None: fail("Invalid resolution") print_available_resolutions(version_index, sys.stderr) return 0 - file_base_name = build_file_base_name(config) + file_base_name = naming.build_file_base_name(config) - with download_inputs(remote_inputs, create_progress()) as temp_inputs: - args = build_ffmpeg_cmd(temp_inputs, file_base_name) - subprocess.run(args) + progress = create_progress() + + with hls.download_inputs(remote_inputs, progress) as temp_inputs: + muxing.mux(temp_inputs, file_base_name, progress) if __name__ == "__main__": diff --git a/src/delarte/api.py b/src/delarte/api.py new file mode 100644 index 0000000..c492577 --- /dev/null +++ b/src/delarte/api.py @@ -0,0 +1,58 @@ +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""Provide ArteTV JSON API utilities.""" + +import json +from http import HTTPStatus +from urllib.request import urlopen + + +def load_api_data(url): + """Retrieve the root node (infamous "data") of an API call response.""" + http_response = urlopen(url) + + if http_response.status != HTTPStatus.OK: + raise RuntimeError("API request failed") + + if ( + http_response.getheader("Content-Type") + != "application/vnd.api+json; charset=utf-8" + ): + raise ValueError("API response not supported") + + return json.load(http_response)["data"] + + +def load_config(lang, program_id): + """Retrieve a program config from API.""" + url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}" + config = load_api_data(url) + + if config["type"] != "ConfigPlayer": + raise ValueError("Invalid API response") + + if config["attributes"]["metadata"]["providerId"] != program_id: + raise ValueError("Invalid API response") + + return config + + +def iter_versions(config): + """Return a (code, label, index_url) iterator.""" + for stream in config["attributes"]["streams"]: + yield ( + # version code + stream["versions"][0]["eStat"]["ml5"], + # version full name + stream["versions"][0]["label"], + ) + + +def select_version(config, version_code): + """Return the version index url for the given version code.""" + for stream in config["attributes"]["streams"]: + if stream["versions"][0]["eStat"]["ml5"] == version_code: + return stream["url"] + + return None diff --git a/src/delarte/hls.py b/src/delarte/hls.py new file mode 100644 index 0000000..c356478 --- /dev/null +++ b/src/delarte/hls.py @@ -0,0 +1,246 @@ +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""Provide HLS protocol utilities.""" + +import contextlib +import io +import os +import re +from http import HTTPStatus +from http.client import HTTPConnection, HTTPSConnection +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from urllib.request import urlopen + +import m3u8 +import webvtt + + +def load_version_index(url): + """Retrieve a version from m3u8 file.""" + version_index = m3u8.load(url) + + if not version_index.playlists: + raise ValueError("Unexpected missing playlists") + + for pl in version_index.playlists: + count = 0 + for m in pl.media: + if m.type == "AUDIO": + count += 1 + if count != 1: + raise ValueError("Unexpected missing or multiple audio tracks") + + count = 0 + for m in pl.media: + if m.type == "SUBTITLES": + count += 1 + if count > 1: + raise ValueError("Unexpected multiple subtitle tracks") + + return version_index + + +def iter_resolutions(version_index): + """Iterate over resolution options.""" + for pl in sorted( + version_index.playlists, + key=lambda pl: pl.stream_info.resolution[1], + reverse=True, + ): + yield ( + # resolution code (1080p, 720p, ...) + f"{pl.stream_info.resolution[1]}p", + # resolution label + f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}", + ) + + +def select_resolution(version_index, resolution_code): + """Return the stream information for a given resolution_code.""" + for pl in version_index.playlists: + code = f"{pl.stream_info.resolution[1]}p" + if code != resolution_code: + continue + + audio_track = None + for m in pl.media: + if m.type == "AUDIO": + audio_track = (m.language, pl.base_uri + m.uri) + break + + subtitles_track = None + for m in pl.media: + if m.type == "SUBTITLES": + subtitles_track = (m.language, pl.base_uri + m.uri) + break + + return ( + pl.base_uri + pl.uri, + audio_track, + subtitles_track, + ) + + return None + + +def parse_byterange(obj): + """Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end).""" + count, offset = [int(v) for v in obj.byterange.split("@")] + return offset, offset + count - 1 + + +def load_av_index(index_url): + """Load a M3U8 audio or video index.""" + index = m3u8.load(index_url) + + file_name = index.segment_map[0].uri + range_start, range_end = parse_byterange(index.segment_map[0]) + if range_start != 0: + raise ValueError("Invalid a/v index: does not start at 0") + chunks = [(range_start, range_end)] + total = range_end + 1 + + for segment in index.segments: + if segment.uri != file_name: + raise ValueError("Invalid a/v index: multiple file names") + + range_start, range_end = parse_byterange(segment) + if range_start != total: + raise ValueError( + f"Invalid a/v index: discontious ranges ({range_start} != {total})" + ) + + chunks.append((range_start, range_end)) + total = range_end + 1 + + return urlparse(index.segment_map[0].absolute_uri), chunks + + +def download_av_input(index_url, progress): + """Download an audio or video stream to temporary directory.""" + url, ranges = load_av_index(index_url) + total = ranges[-1][1] + + Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection + connection = Connector(url.hostname) + connection.connect() + + with ( + NamedTemporaryFile( + mode="w+b", delete=False, prefix="delarte.", suffix=".mp4" + ) as f, + contextlib.closing(connection) as c, + ): + for range_start, range_end in ranges: + c.request( + "GET", + url.path, + headers={ + "Accept": "*/*", + "Accept-Language": "fr,en;q=0.7,en-US;q=0.3", + "Accept-Encoding": "gzip, deflate, br, identity", + "Range": f"bytes={range_start}-{range_end}", + "Origin": "https://www.arte.tv", + "Connection": "keep-alive", + "Referer": "https://www.arte.tv/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", + "Sec-GPC": "1", + "DNT": "1", + }, + ) + r = c.getresponse() + if r.status != 206: + raise ValueError(f"Invalid response status {r.status}") + + content = r.read() + if len(content) != range_end - range_start + 1: + raise ValueError("Invalid range length") + f.write(content) + + progress(range_end, total) + + return f.name + + +def download_subtitles_input(index_url, progress): + """Return a temporary file name where VTT subtitle has been downloaded/converted to SRT.""" + subtitles_index = m3u8.load(index_url) + urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files] + + if not urls: + raise ValueError("No subtitle files") + + if len(urls) > 1: + raise ValueError("Multiple subtitle files") + + progress(0, 2) + http_response = urlopen(urls[0]) + if http_response.status != HTTPStatus.OK: + raise RuntimeError("Subtitle request failed") + + buffer = io.StringIO(http_response.read().decode("utf8")) + progress(1, 2) + + with NamedTemporaryFile( + "w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8" + ) as f: + i = 1 + for caption in webvtt.read_buffer(buffer): + print(i, file=f) + print( + re.sub(r"\.", ",", caption.start) + + " --> " + + re.sub(r"\.", ",", caption.end), + file=f, + ) + print(caption.text + "\n", file=f) + i += 1 + progress(2, 2) + return f.name + + +@contextlib.contextmanager +def download_inputs(remote_inputs, progress): + """Download inputs in temporary files.""" + # It is implemented as a context manager that will delete temporary files on exit. + + video_index_url, audio_track, subtitles_track = remote_inputs + + video_filename = None + audio_filename = None + subtitles_filename = None + + try: + video_filename = download_av_input( + video_index_url, lambda i, n: progress("video", i, n) + ) + + (audio_lang, audio_index_url) = audio_track + audio_filename = download_av_input( + audio_index_url, lambda i, n: progress("audio", i, n) + ) + + if subtitles_track: + (subtitles_lang, subtitles_index_url) = subtitles_track + subtitles_filename = download_subtitles_input( + subtitles_index_url, lambda i, n: progress("subtitles", i, n) + ) + + yield ( + video_filename, + (audio_lang, audio_filename), + (subtitles_lang, subtitles_filename), + ) + else: + yield (video_filename, (audio_lang, audio_filename), None) + finally: + if video_filename and os.path.isfile(video_filename): + os.unlink(video_filename) + if audio_filename and os.path.isfile(audio_filename): + os.unlink(audio_filename) + if subtitles_filename and os.path.isfile(subtitles_filename): + os.unlink(subtitles_filename) diff --git a/src/delarte/muxing.py b/src/delarte/muxing.py new file mode 100644 index 0000000..cb73ff6 --- /dev/null +++ b/src/delarte/muxing.py @@ -0,0 +1,37 @@ +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""Provide media muxing utilities.""" + +import subprocess + + +def mux(inputs, file_base_name, progress): + """Build FFMPEG args.""" + video_input, audio_track, subtitles_track = inputs + + audio_lang, audio_input = audio_track + if subtitles_track: + subtitles_lang, subtitles_input = subtitles_track + + cmd = ["ffmpeg", "-hide_banner"] + cmd.extend(["-i", video_input]) + cmd.extend(["-i", audio_input]) + if subtitles_track: + cmd.extend(["-i", subtitles_input]) + + cmd.extend(["-c:v", "copy"]) + cmd.extend(["-c:a", "copy"]) + if subtitles_track: + cmd.extend(["-c:s", "copy"]) + + cmd.extend(["-bsf:a", "aac_adtstoasc"]) + cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"]) + + if subtitles_track: + cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"]) + cmd.extend(["-disposition:s:0", "default"]) + + cmd.append(f"{file_base_name}.mkv") + + subprocess.run(cmd) diff --git a/src/delarte/naming.py b/src/delarte/naming.py new file mode 100644 index 0000000..f456403 --- /dev/null +++ b/src/delarte/naming.py @@ -0,0 +1,9 @@ +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""Provide contexted based file naming utility.""" + + +def build_file_base_name(config): + """Create a base file name from config metadata.""" + return config["attributes"]["metadata"]["title"].replace("/", "-") diff --git a/src/delarte/www.py b/src/delarte/www.py new file mode 100644 index 0000000..9fb9b40 --- /dev/null +++ b/src/delarte/www.py @@ -0,0 +1,29 @@ +# Licence: GNU AGPL v3: http://www.gnu.org/licenses/ +# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git) + +"""Provide ArteTV website utilities.""" + +from urllib.parse import urlparse + +LANGUAGES = ["fr", "de", "en", "es", "pl", "it"] + + +def parse_url(program_page_url): + """Parse ArteTV web URL into UI language and program ID.""" + url = urlparse(program_page_url) + if url.hostname != "www.arte.tv": + raise ValueError("not an ArteTV url") + + program_page_path = url.path.split("/")[1:] + + lang = program_page_path.pop(0) + + if lang not in LANGUAGES: + raise ValueError(f"invalid url language code: {lang}") + + if program_page_path.pop(0) != "videos": + raise ValueError("invalid ArteTV url") + + program_id = program_page_path.pop(0) + + return lang, program_id