Fix issue #13 - split code in multiple modules

Implemented modules:
 - api: deals with ArteTV JSON API
 - hls: deals with HLS protocol
 - muxing: deals with the stream multiplexing
 - naming: deals with output file naming
 - www: deals with ArteTV web interface
This commit is contained in:
Barbagus 2022-12-13 07:29:59 +01:00
parent 9f79687088
commit 6b8f2232c4
7 changed files with 406 additions and 378 deletions

View File

@ -1,338 +1,6 @@
"""delarte.
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
ArteTV downloader
"""delarte - ArteTV downloader."""
Licence: GNU AGPL v3: http://www.gnu.org/licenses/
This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""
__version__ = "0.1"
import contextlib
import io
import json
import os
import re
from http import HTTPStatus
from http.client import HTTPSConnection, HTTPConnection
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
def load_api_data(url):
"""Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url)
if http_response.status != HTTPStatus.OK:
raise RuntimeError("API request failed")
if (
http_response.getheader("Content-Type")
!= "application/vnd.api+json; charset=utf-8"
):
raise ValueError("API response not supported")
return json.load(http_response)["data"]
def load_config_api(lang, program_id):
"""Retrieve a program config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
config = load_api_data(url)
if config["type"] != "ConfigPlayer":
raise ValueError("Invalid API response")
if config["attributes"]["metadata"]["providerId"] != program_id:
raise ValueError("Invalid API response")
return config
def iter_versions(config):
"""Return a (code, label, index_url) iterator."""
for stream in config["attributes"]["streams"]:
yield (
# version code
stream["versions"][0]["eStat"]["ml5"],
# version full name
stream["versions"][0]["label"],
)
def select_version(config, version_code):
"""Return the version index url for the given version code."""
for stream in config["attributes"]["streams"]:
if stream["versions"][0]["eStat"]["ml5"] == version_code:
return stream["url"]
return None
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")
def download_subtitles_input(index_url, progress):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
subtitles_index = m3u8.load(index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
progress(0, 2)
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
def load_version_index(url):
"""Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
if not version_index.playlists:
raise ValueError("Unexpected missing playlists")
for pl in version_index.playlists:
count = 0
for m in pl.media:
if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
count = 0
for m in pl.media:
if m.type == "SUBTITLES":
count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
return version_index
def iter_resolutions(version_index):
"""Iterate over resolution options."""
for pl in sorted(
version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def select_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def build_ffmpeg_cmd(inputs, file_base_name):
"""Build FFMPEG args."""
video_input, audio_track, subtitles_track = inputs
audio_lang, audio_input = audio_track
if subtitles_track:
subtitles_lang, subtitles_input = subtitles_track
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", video_input])
cmd.extend(["-i", audio_input])
if subtitles_track:
cmd.extend(["-i", subtitles_input])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if subtitles_track:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
if subtitles_track:
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")
return cmd
def parse_byterange(obj):
"""Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)."""
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def load_av_index(index_url):
"""Load a M3U8 audio or video index."""
index = m3u8.load(index_url)
file_name = index.segment_map[0].uri
range_start, range_end = parse_byterange(index.segment_map[0])
if range_start != 0:
raise ValueError("Invalid a/v index: does not start at 0")
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in index.segments:
if segment.uri != file_name:
raise ValueError("Invalid a/v index: multiple file names")
range_start, range_end = parse_byterange(segment)
if range_start != total:
raise ValueError(
f"Invalid a/v index: discontious ranges ({range_start} != {total})"
)
chunks.append((range_start, range_end))
total = range_end + 1
return urlparse(index.segment_map[0].absolute_uri), chunks
def download_av_input(index_url, progress):
"""Download an audio or video stream to temporary directory."""
url, ranges = load_av_index(index_url)
total = ranges[-1][1]
Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection
connection = Connector(url.hostname)
connection.connect()
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f,
contextlib.closing(connection) as c,
):
for range_start, range_end in ranges:
c.request(
"GET",
url.path,
headers={
"Accept": "*/*",
"Accept-Language": "fr,en;q=0.7,en-US;q=0.3",
"Accept-Encoding": "gzip, deflate, br, identity",
"Range": f"bytes={range_start}-{range_end}",
"Origin": "https://www.arte.tv",
"Connection": "keep-alive",
"Referer": "https://www.arte.tv/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Sec-GPC": "1",
"DNT": "1",
},
)
r = c.getresponse()
if r.status != 206:
raise ValueError(f"Invalid response status {r.status}")
content = r.read()
if len(content) != range_end - range_start + 1:
raise ValueError("Invalid range length")
f.write(content)
progress(range_end, total)
return f.name
@contextlib.contextmanager
def download_inputs(remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = download_av_input(
video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = download_av_input(
audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = download_subtitles_input(
subtitles_index_url, lambda i, n: progress("subtitles", i, n)
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)

View File

@ -1,27 +1,22 @@
"""ArteTV dowloader.
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""delarte - ArteTV dowloader.
usage: delarte [-h|--help] - print this message
or: delarte program_page_url - show available versions
or: delarte program_page_url version - show available resolutions
or: delarte program_page_url version resolution - download the given video
"""
import subprocess
import sys
import time
from urllib.parse import urlparse
from . import (
build_ffmpeg_cmd,
build_file_base_name,
download_inputs,
select_resolution,
select_version,
iter_resolutions,
iter_versions,
load_config_api,
load_version_index,
)
from . import api
from . import hls
from . import muxing
from . import naming
from . import www
def fail(message, code=1):
@ -33,14 +28,14 @@ def fail(message, code=1):
def print_available_versions(config, f):
"""Print available program versions."""
print(f"Available versions:", file=f)
for code, label in iter_versions(config):
for code, label in api.iter_versions(config):
print(f"\t{code} - {label}", file=f)
def print_available_resolutions(version_index, f):
"""Print available version resolutions."""
print(f"Available resolutions:", file=f)
for code, label in iter_resolutions(version_index):
for code, label in hls.iter_resolutions(version_index):
print(f"\t{code} - {label}", file=f)
@ -79,27 +74,12 @@ def main():
return 0
try:
program_page_url = urlparse(args.pop(0))
if program_page_url.hostname != "www.arte.tv":
return fail("Not an ArteTV url")
program_page_path = program_page_url.path.split("/")[1:]
ui_language = program_page_path.pop(0)
if ui_language not in ("fr", "de", "en", "es", "pl", "it"):
return fail(f"Invalid url language code: {ui_language}")
if program_page_path.pop(0) != "videos":
return fail("Invalid ArteTV url")
program_id = program_page_path.pop(0)
except ValueError:
return fail("Invalid url")
www_lang, program_id = www.parse_url(args.pop(0))
except ValueError as e:
return fail(f"Invalid url: {e}")
try:
config = load_config_api(ui_language, program_id)
config = api.load_config(www_lang, program_id)
except ValueError:
return fail("Invalid program")
@ -107,29 +87,30 @@ def main():
print_available_versions(config, sys.stdout)
return 0
version_index_url = select_version(config, args.pop(0))
version_index_url = api.select_version(config, args.pop(0))
if version_index_url is None:
fail("Invalid version")
print_available_versions(config, sys.stderr)
return 1
version_index = load_version_index(version_index_url)
version_index = hls.load_version_index(version_index_url)
if not args:
print_available_resolutions(version_index, sys.stdout)
return 0
remote_inputs = select_resolution(version_index, args.pop(0))
remote_inputs = hls.select_resolution(version_index, args.pop(0))
if remote_inputs is None:
fail("Invalid resolution")
print_available_resolutions(version_index, sys.stderr)
return 0
file_base_name = build_file_base_name(config)
file_base_name = naming.build_file_base_name(config)
with download_inputs(remote_inputs, create_progress()) as temp_inputs:
args = build_ffmpeg_cmd(temp_inputs, file_base_name)
subprocess.run(args)
progress = create_progress()
with hls.download_inputs(remote_inputs, progress) as temp_inputs:
muxing.mux(temp_inputs, file_base_name, progress)
if __name__ == "__main__":

58
src/delarte/api.py Normal file
View File

@ -0,0 +1,58 @@
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide ArteTV JSON API utilities."""
import json
from http import HTTPStatus
from urllib.request import urlopen
def load_api_data(url):
"""Retrieve the root node (infamous "data") of an API call response."""
http_response = urlopen(url)
if http_response.status != HTTPStatus.OK:
raise RuntimeError("API request failed")
if (
http_response.getheader("Content-Type")
!= "application/vnd.api+json; charset=utf-8"
):
raise ValueError("API response not supported")
return json.load(http_response)["data"]
def load_config(lang, program_id):
"""Retrieve a program config from API."""
url = f"https://api.arte.tv/api/player/v2/config/{lang}/{program_id}"
config = load_api_data(url)
if config["type"] != "ConfigPlayer":
raise ValueError("Invalid API response")
if config["attributes"]["metadata"]["providerId"] != program_id:
raise ValueError("Invalid API response")
return config
def iter_versions(config):
"""Return a (code, label, index_url) iterator."""
for stream in config["attributes"]["streams"]:
yield (
# version code
stream["versions"][0]["eStat"]["ml5"],
# version full name
stream["versions"][0]["label"],
)
def select_version(config, version_code):
"""Return the version index url for the given version code."""
for stream in config["attributes"]["streams"]:
if stream["versions"][0]["eStat"]["ml5"] == version_code:
return stream["url"]
return None

246
src/delarte/hls.py Normal file
View File

@ -0,0 +1,246 @@
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
import contextlib
import io
import os
import re
from http import HTTPStatus
from http.client import HTTPConnection, HTTPSConnection
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
def load_version_index(url):
"""Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
if not version_index.playlists:
raise ValueError("Unexpected missing playlists")
for pl in version_index.playlists:
count = 0
for m in pl.media:
if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
count = 0
for m in pl.media:
if m.type == "SUBTITLES":
count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
return version_index
def iter_resolutions(version_index):
"""Iterate over resolution options."""
for pl in sorted(
version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def select_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def parse_byterange(obj):
"""Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)."""
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def load_av_index(index_url):
"""Load a M3U8 audio or video index."""
index = m3u8.load(index_url)
file_name = index.segment_map[0].uri
range_start, range_end = parse_byterange(index.segment_map[0])
if range_start != 0:
raise ValueError("Invalid a/v index: does not start at 0")
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in index.segments:
if segment.uri != file_name:
raise ValueError("Invalid a/v index: multiple file names")
range_start, range_end = parse_byterange(segment)
if range_start != total:
raise ValueError(
f"Invalid a/v index: discontious ranges ({range_start} != {total})"
)
chunks.append((range_start, range_end))
total = range_end + 1
return urlparse(index.segment_map[0].absolute_uri), chunks
def download_av_input(index_url, progress):
"""Download an audio or video stream to temporary directory."""
url, ranges = load_av_index(index_url)
total = ranges[-1][1]
Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection
connection = Connector(url.hostname)
connection.connect()
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f,
contextlib.closing(connection) as c,
):
for range_start, range_end in ranges:
c.request(
"GET",
url.path,
headers={
"Accept": "*/*",
"Accept-Language": "fr,en;q=0.7,en-US;q=0.3",
"Accept-Encoding": "gzip, deflate, br, identity",
"Range": f"bytes={range_start}-{range_end}",
"Origin": "https://www.arte.tv",
"Connection": "keep-alive",
"Referer": "https://www.arte.tv/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Sec-GPC": "1",
"DNT": "1",
},
)
r = c.getresponse()
if r.status != 206:
raise ValueError(f"Invalid response status {r.status}")
content = r.read()
if len(content) != range_end - range_start + 1:
raise ValueError("Invalid range length")
f.write(content)
progress(range_end, total)
return f.name
def download_subtitles_input(index_url, progress):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
subtitles_index = m3u8.load(index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
progress(0, 2)
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_inputs(remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = download_av_input(
video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = download_av_input(
audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = download_subtitles_input(
subtitles_index_url, lambda i, n: progress("subtitles", i, n)
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)

37
src/delarte/muxing.py Normal file
View File

@ -0,0 +1,37 @@
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide media muxing utilities."""
import subprocess
def mux(inputs, file_base_name, progress):
"""Build FFMPEG args."""
video_input, audio_track, subtitles_track = inputs
audio_lang, audio_input = audio_track
if subtitles_track:
subtitles_lang, subtitles_input = subtitles_track
cmd = ["ffmpeg", "-hide_banner"]
cmd.extend(["-i", video_input])
cmd.extend(["-i", audio_input])
if subtitles_track:
cmd.extend(["-i", subtitles_input])
cmd.extend(["-c:v", "copy"])
cmd.extend(["-c:a", "copy"])
if subtitles_track:
cmd.extend(["-c:s", "copy"])
cmd.extend(["-bsf:a", "aac_adtstoasc"])
cmd.extend(["-metadata:s:a:0", f"language={audio_lang}"])
if subtitles_track:
cmd.extend(["-metadata:s:s:0", f"language={subtitles_lang}"])
cmd.extend(["-disposition:s:0", "default"])
cmd.append(f"{file_base_name}.mkv")
subprocess.run(cmd)

9
src/delarte/naming.py Normal file
View File

@ -0,0 +1,9 @@
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide contexted based file naming utility."""
def build_file_base_name(config):
"""Create a base file name from config metadata."""
return config["attributes"]["metadata"]["title"].replace("/", "-")

29
src/delarte/www.py Normal file
View File

@ -0,0 +1,29 @@
# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide ArteTV website utilities."""
from urllib.parse import urlparse
LANGUAGES = ["fr", "de", "en", "es", "pl", "it"]
def parse_url(program_page_url):
"""Parse ArteTV web URL into UI language and program ID."""
url = urlparse(program_page_url)
if url.hostname != "www.arte.tv":
raise ValueError("not an ArteTV url")
program_page_path = url.path.split("/")[1:]
lang = program_page_path.pop(0)
if lang not in LANGUAGES:
raise ValueError(f"invalid url language code: {lang}")
if program_page_path.pop(0) != "videos":
raise ValueError("invalid ArteTV url")
program_id = program_page_path.pop(0)
return lang, program_id