delarte_test/src/delarte/hls.py
Barbagus 6b8f2232c4 Fix issue #13 - split code in multiple modules
Implemented modules:
 - api: deals with ArteTV JSON API
 - hls: deals with HLS protocol
 - muxing: deals with the stream multiplexing
 - naming: deals with output file naming
 - www: deals with ArteTV web interface
2022-12-13 07:29:59 +01:00

247 lines
7.6 KiB
Python

# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
import contextlib
import io
import os
import re
from http import HTTPStatus
from http.client import HTTPConnection, HTTPSConnection
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
def load_version_index(url):
"""Retrieve a version from m3u8 file."""
version_index = m3u8.load(url)
if not version_index.playlists:
raise ValueError("Unexpected missing playlists")
for pl in version_index.playlists:
count = 0
for m in pl.media:
if m.type == "AUDIO":
count += 1
if count != 1:
raise ValueError("Unexpected missing or multiple audio tracks")
count = 0
for m in pl.media:
if m.type == "SUBTITLES":
count += 1
if count > 1:
raise ValueError("Unexpected multiple subtitle tracks")
return version_index
def iter_resolutions(version_index):
"""Iterate over resolution options."""
for pl in sorted(
version_index.playlists,
key=lambda pl: pl.stream_info.resolution[1],
reverse=True,
):
yield (
# resolution code (1080p, 720p, ...)
f"{pl.stream_info.resolution[1]}p",
# resolution label
f"{pl.stream_info.resolution[0]} x {pl.stream_info.resolution[1]}",
)
def select_resolution(version_index, resolution_code):
"""Return the stream information for a given resolution_code."""
for pl in version_index.playlists:
code = f"{pl.stream_info.resolution[1]}p"
if code != resolution_code:
continue
audio_track = None
for m in pl.media:
if m.type == "AUDIO":
audio_track = (m.language, pl.base_uri + m.uri)
break
subtitles_track = None
for m in pl.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, pl.base_uri + m.uri)
break
return (
pl.base_uri + pl.uri,
audio_track,
subtitles_track,
)
return None
def parse_byterange(obj):
"""Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)."""
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def load_av_index(index_url):
"""Load a M3U8 audio or video index."""
index = m3u8.load(index_url)
file_name = index.segment_map[0].uri
range_start, range_end = parse_byterange(index.segment_map[0])
if range_start != 0:
raise ValueError("Invalid a/v index: does not start at 0")
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in index.segments:
if segment.uri != file_name:
raise ValueError("Invalid a/v index: multiple file names")
range_start, range_end = parse_byterange(segment)
if range_start != total:
raise ValueError(
f"Invalid a/v index: discontious ranges ({range_start} != {total})"
)
chunks.append((range_start, range_end))
total = range_end + 1
return urlparse(index.segment_map[0].absolute_uri), chunks
def download_av_input(index_url, progress):
"""Download an audio or video stream to temporary directory."""
url, ranges = load_av_index(index_url)
total = ranges[-1][1]
Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection
connection = Connector(url.hostname)
connection.connect()
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f,
contextlib.closing(connection) as c,
):
for range_start, range_end in ranges:
c.request(
"GET",
url.path,
headers={
"Accept": "*/*",
"Accept-Language": "fr,en;q=0.7,en-US;q=0.3",
"Accept-Encoding": "gzip, deflate, br, identity",
"Range": f"bytes={range_start}-{range_end}",
"Origin": "https://www.arte.tv",
"Connection": "keep-alive",
"Referer": "https://www.arte.tv/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Sec-GPC": "1",
"DNT": "1",
},
)
r = c.getresponse()
if r.status != 206:
raise ValueError(f"Invalid response status {r.status}")
content = r.read()
if len(content) != range_end - range_start + 1:
raise ValueError("Invalid range length")
f.write(content)
progress(range_end, total)
return f.name
def download_subtitles_input(index_url, progress):
"""Return a temporary file name where VTT subtitle has been downloaded/converted to SRT."""
subtitles_index = m3u8.load(index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
progress(0, 2)
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_inputs(remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = download_av_input(
video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = download_av_input(
audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = download_subtitles_input(
subtitles_index_url, lambda i, n: progress("subtitles", i, n)
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)