delarte/src/delarte/hls.py
Barbagus 4c518993ef Change error handling
Creation of a `common.Error` exception whose string representation is
taken from its docstring.

Creation of a `common.UnexpectedError` to serve as base for exceptions
raised while checking assumptions on requests and responses.

The later are handled by displaying a message inviting user to submit
the error to us, so we can correct our assumptions.
2022-12-22 17:43:42 +01:00

331 lines
10 KiB
Python

# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
# For terminology, from HLS protocol RFC8216
# 2. Overview
#
# A multimedia presentation is specified by a Uniform Resource
# Identifier (URI) [RFC3986] to a Playlist.
#
# A Playlist is either a Media Playlist or a Master Playlist. Both are
# UTF-8 text files containing URIs and descriptive tags.
#
# A Media Playlist contains a list of Media Segments, which, when
# played sequentially, will play the multimedia presentation.
#
# Here is an example of a Media Playlist:
#
# #EXTM3U
# #EXT-X-TARGETDURATION:10
#
# #EXTINF:9.009,
# http://media.example.com/first.ts
# #EXTINF:9.009,
# http://media.example.com/second.ts
# #EXTINF:3.003,
# http://media.example.com/third.ts
#
# The first line is the format identifier tag #EXTM3U. The line
# containing #EXT-X-TARGETDURATION says that all Media Segments will be
# 10 seconds long or less. Then, three Media Segments are declared.
# The first and second are 9.009 seconds long; the third is 3.003
# seconds.
#
# To play this Playlist, the client first downloads it and then
# downloads and plays each Media Segment declared within it. The
# client reloads the Playlist as described in this document to discover
# any added segments. Data SHOULD be carried over HTTP [RFC7230], but,
# in general, a URI can specify any protocol that can reliably transfer
# the specified resource on demand.
#
# A more complex presentation can be described by a Master Playlist. A
# Master Playlist provides a set of Variant Streams, each of which
# describes a different version of the same content.
#
# A Variant Stream includes a Media Playlist that specifies media
# encoded at a particular bit rate, in a particular format, and at a
# particular resolution for media containing video.
#
# A Variant Stream can also specify a set of Renditions. Renditions
# are alternate versions of the content, such as audio produced in
# different languages or video recorded from different camera angles.
#
# Clients should switch between different Variant Streams to adapt to
# network conditions. Clients should choose Renditions based on user
# preferences.
import contextlib
import io
import os
import re
from tempfile import NamedTemporaryFile
import m3u8
import webvtt
from . import common
#
# WARNING !
#
# This module does not aim for a full implementation of HLS, only the
# subset useful for the actual observed usage of ArteTV.
#
# - URIs are relative file paths
# - Master playlists have at least one variant
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video media playlists segments are incremental ranges of the same file
# - Subtitles media playlists have only one segment
class UnexpectedResponse(common.UnexpectedError):
"""Unexpected response from ArteTV."""
def _make_resolution_code(variant):
# resolution code (1080p, 720p, ...)
return f"{variant.stream_info.resolution[1]}p"
def _fetch_playlist(http_session, url):
# Fetch a M3U8 playlist
r = http_session.get(url)
r.raise_for_status()
return m3u8.loads(r.text, url)
def load_master_playlist(http_session, url):
"""Download and return a master playlist."""
master_playlist = _fetch_playlist(http_session, url)
if not master_playlist.playlists:
raise UnexpectedResponse("NO_PLAYLISTS", url)
resolution_codes = set()
for variant in master_playlist.playlists:
resolution_code = _make_resolution_code(variant)
if resolution_code in resolution_codes:
raise UnexpectedResponse("DUPLICATE_RESOLUTION_CODE", url, resolution_code)
resolution_codes.add(resolution_code)
audio_media = False
subtitles_media = False
for m in variant.media:
if m.type == "AUDIO":
if audio_media:
raise UnexpectedResponse("MULTIPLE_AUDIO_MEDIA", url)
audio_media = True
elif m.type == "SUBTITLES":
if subtitles_media:
raise UnexpectedResponse("MULTIPLE_SUBTITLES_MEDIA", url)
subtitles_media = True
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", url)
return master_playlist
def iter_variants(master_playlist):
"""Iterate over variants."""
for variant in sorted(
master_playlist.playlists,
key=lambda v: v.stream_info.resolution[1],
reverse=True,
):
yield (
_make_resolution_code(variant),
f"{variant.stream_info.resolution[0]} x {variant.stream_info.resolution[1]}",
)
def select_variant(master_playlist, resolution_code):
"""Return the stream information for a given resolution code."""
for variant in master_playlist.playlists:
code = _make_resolution_code(variant)
if code != resolution_code:
continue
audio_track = None
for m in variant.media:
if m.type == "AUDIO":
audio_track = (m.language, variant.base_uri + m.uri)
break
subtitles_track = None
for m in variant.media:
if m.type == "SUBTITLES":
subtitles_track = (m.language, variant.base_uri + m.uri)
break
return (
variant.base_uri + variant.uri,
audio_track,
subtitles_track,
)
return None
def _parse_byterange(obj):
# Parse a M3U8 `byterange` (count@offset) into http range (range_start, rang_end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def _load_av_segments(http_session, media_playlist_url):
media_playlist = _fetch_playlist(http_session, media_playlist_url)
file_name = media_playlist.segment_map[0].uri
range_start, range_end = _parse_byterange(media_playlist.segment_map[0])
if range_start != 0:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_START", media_playlist_url
)
chunks = [(range_start, range_end)]
total = range_end + 1
for segment in media_playlist.segments:
if segment.uri != file_name:
raise UnexpectedResponse("MULTIPLE_STREAM_MEDIA_FILES", media_playlist_url)
range_start, range_end = _parse_byterange(segment)
if range_start != total:
raise UnexpectedResponse(
"DISCONTINUOUS_STREAM_MEDIA_FRAGMENT", media_playlist_url
)
chunks.append((range_start, range_end))
total = range_end + 1
return media_playlist.segment_map[0].absolute_uri, chunks
def _download_av_stream(http_session, media_playlist_url, progress):
# Download an audio or video stream to temporary directory
url, ranges = _load_av_segments(http_session, media_playlist_url)
total = ranges[-1][1]
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f
):
for range_start, range_end in ranges:
r = http_session.get(
url,
headers={
"Range": f"bytes={range_start}-{range_end}",
},
)
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedResponse(
"STREAM_MEDIA_HTTP_STATUS",
media_playlist_url,
r.request.headers,
r.status,
)
if len(r.content) != range_end - range_start + 1:
raise UnexpectedResponse(
"INVALID_STREAM_MEDIA_FRAGMENT_LENGTH", media_playlist_url
)
f.write(r.content)
progress(range_end, total)
return f.name
def _download_subtitles_input(http_session, index_url, progress):
# Return a temporary file name where VTT subtitle has been downloaded/converted to SRT
subtitles_index = _fetch_playlist(http_session, index_url)
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", index_url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", index_url)
progress(0, 2)
r = http_session.get(urls[0])
r.raise_for_status()
buffer = io.StringIO(r.text)
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
i = 1
for caption in webvtt.read_buffer(buffer):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
i += 1
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_inputs(http_session, remote_inputs, progress):
"""Download inputs in temporary files."""
# It is implemented as a context manager that will delete temporary files on exit.
video_index_url, audio_track, subtitles_track = remote_inputs
video_filename = None
audio_filename = None
subtitles_filename = None
try:
video_filename = _download_av_stream(
http_session, video_index_url, lambda i, n: progress("video", i, n)
)
(audio_lang, audio_index_url) = audio_track
audio_filename = _download_av_stream(
http_session, audio_index_url, lambda i, n: progress("audio", i, n)
)
if subtitles_track:
(subtitles_lang, subtitles_index_url) = subtitles_track
subtitles_filename = _download_subtitles_input(
http_session,
subtitles_index_url,
lambda i, n: progress("subtitles", i, n),
)
yield (
video_filename,
(audio_lang, audio_filename),
(subtitles_lang, subtitles_filename),
)
else:
yield (video_filename, (audio_lang, audio_filename), None)
finally:
if video_filename and os.path.isfile(video_filename):
os.unlink(video_filename)
if audio_filename and os.path.isfile(audio_filename):
os.unlink(audio_filename)
if subtitles_filename and os.path.isfile(subtitles_filename):
os.unlink(subtitles_filename)