delarte_test/src/delarte/hls.py

339 lines
11 KiB
Python

# Licence: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of [`delarte`](https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
# For terminology, from HLS protocol RFC8216
# 2. Overview
#
# A multimedia presentation is specified by a Uniform Resource
# Identifier (URI) [RFC3986] to a Playlist.
#
# A Playlist is either a Media Playlist or a Master Playlist. Both are
# UTF-8 text files containing URIs and descriptive tags.
#
# A Media Playlist contains a list of Media Segments, which, when
# played sequentially, will play the multimedia presentation.
#
# Here is an example of a Media Playlist:
#
# #EXTM3U
# #EXT-X-TARGETDURATION:10
#
# #EXTINF:9.009,
# http://media.example.com/first.ts
# #EXTINF:9.009,
# http://media.example.com/second.ts
# #EXTINF:3.003,
# http://media.example.com/third.ts
#
# The first line is the format identifier tag #EXTM3U. The line
# containing #EXT-X-TARGETDURATION says that all Media Segments will be
# 10 seconds long or less. Then, three Media Segments are declared.
# The first and second are 9.009 seconds long; the third is 3.003
# seconds.
#
# To play this Playlist, the client first downloads it and then
# downloads and plays each Media Segment declared within it. The
# client reloads the Playlist as described in this document to discover
# any added segments. Data SHOULD be carried over HTTP [RFC7230], but,
# in general, a URI can specify any protocol that can reliably transfer
# the specified resource on demand.
#
# A more complex presentation can be described by a Master Playlist. A
# Master Playlist provides a set of Variant Streams, each of which
# describes a different version of the same content.
#
# A Variant Stream includes a Media Playlist that specifies media
# encoded at a particular bit rate, in a particular format, and at a
# particular resolution for media containing video.
#
# A Variant Stream can also specify a set of Renditions. Renditions
# are alternate versions of the content, such as audio produced in
# different languages or video recorded from different camera angles.
#
# Clients should switch between different Variant Streams to adapt to
# network conditions. Clients should choose Renditions based on user
# preferences.
import contextlib
import io
import os
import re
from http import HTTPStatus
from http.client import HTTPConnection, HTTPSConnection
from tempfile import NamedTemporaryFile
from urllib.parse import urlparse
from urllib.request import urlopen
import m3u8
import webvtt
#
# WARNING !
#
# This module does not aim for a full implementation of HLS, only the
# subset usefull for the actual observed usage of ArteTV.
#
# - URIs are relative file paths
# - Master playlists have at least one variant
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video media playlists segments are incrmental ranges of the same file
# - Subtitles media playlists have only one segment
def _make_resolution_code(variant):
# resolution code (1080p, 720p, ...)
return f"{variant.stream_info.resolution[1]}p"
def _is_relative_file_path(uri):
try:
url = urlparse(uri)
return url.path == uri and not uri.startswith("/")
except ValueError:
return False
def load_master_playlist(url):
"""Download and return a master playlist."""
master_playlist = m3u8.load(url)
if not master_playlist.playlists:
raise ValueError("Unexpected missing playlists")
resolution_codes = set()
for variant in master_playlist.playlists:
resolution_code = _make_resolution_code(variant)
if resolution_code in resolution_codes:
raise ValueError("Unexpected duplicate resolution")
resolution_codes.add(resolution_code)
audio_media = False
subtitles_media = False
for m in variant.media:
if not _is_relative_file_path(m.uri):
raise ValueError("Invalid relative file name")
if m.type == "AUDIO":
if audio_media:
raise ValueError("Unexpected multiple audio tracks")
audio_media = True
elif m.type == "SUBTITLES":
if subtitles_media:
raise ValueError("Unexpected multiple subtitles tracks")
subtitles_media = True
if not audio_media:
raise ValueError("Unexpected missing audio track")
return master_playlist
def iter_variants(master_playlist):
"""Iterate over variants."""
for variant in sorted(
master_playlist.playlists,
key=lambda v: v.stream_info.resolution[1],
reverse=True,
):
yield (
_make_resolution_code(variant),
f"{variant.stream_info.resolution[0]} x {variant.stream_info.resolution[1]}",
)
def select_variant(master_playlist, resolution_code):
"""Return the stream information for a given resolution code."""
for variant in master_playlist.playlists:
code = _make_resolution_code(variant)
if code != resolution_code:
continue
audio_track = None
for m in variant.media:
if m.type == "AUDIO":
audio_track = (m.language, variant.base_uri + m.uri)
break
subtitles_track = None
for m in variant.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, variant.base_uri + m.uri)
break
return (
variant.base_uri + variant.uri,
audio_track,
subtitles_track,
)
return None
def _parse_byterange(obj):
# Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def _load_av_segments(media_playlist_url):
media_playlist = m3u8.load(media_playlist_url)
file_name = media_playlist.segment_map[0].uri
range_start, range_end = _parse_byterange(media_playlist.segment_map[0])
if range_start != 0:
raise ValueError("Invalid a/v index: does not start at 0")
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in media_playlist.segments:
if segment.uri != file_name:
raise ValueError("Invalid a/v index: multiple file names")
range_start, range_end = _parse_byterange(segment)
if range_start != total:
raise ValueError(
f"Invalid a/v index: discontious ranges ({range_start} != {total})"
)
chunks.append((range_start, range_end))
total = range_end + 1
return urlparse(media_playlist.segment_map[0].absolute_uri), chunks
def _download_av_stream(media_playlist_url, progress):
# Download an audio or video stream to temporary directory
url, ranges = _load_av_segments(media_playlist_url)
total = ranges[-1][1]
Connector = HTTPSConnection if url.scheme == "https" else HTTPConnection
connection = Connector(url.hostname)
connection.connect()
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f,
contextlib.closing(connection) as c,
):
for range_start, range_end in ranges:
c.request(
"GET",
url.path,
headers={
"Accept": "*/*",
"Accept-Language": "fr,en;q=0.7,en-US;q=0.3",
"Accept-Encoding": "gzip, deflate, br, identity",
"Range": f"bytes={range_start}-{range_end}",
"Origin": "https://www.arte.tv",
"Connection": "keep-alive",
"Referer": "https://www.arte.tv/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"Sec-GPC": "1",
"DNT": "1",
},
)
r = c.getresponse()
if r.status != 206:
raise ValueError(f"Invalid response status {r.status}")
content = r.read()
if len(content) != range_end - range_start + 1:
raise ValueError("Invalid range length")
f.write(content)
progress(range_end, total)
return f.name
def _download_subtitles_input(index_url, progress):
# Return a temporary file name where VTT subtitle has been downloaded/converted to SRT
subtitles_index = m3u8.load(index_url)
urls = [subtitles_index.base_uri + "/" + f for f in subtitles_index.files]
if not urls:
raise ValueError("No subtitle files")
if len(urls) > 1:
raise ValueError("Multiple subtitle files")
progress(0, 2)
http_response = urlopen(urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_inputs(remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = _download_av_stream(
video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = _download_av_stream(
audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = _download_subtitles_input(
subtitles_index_url, lambda i, n: progress("subtitles", i, n)
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)