delarte/src/delarte/hls.py

309 lines
9.0 KiB
Python

# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
import contextlib
import os
from tempfile import NamedTemporaryFile
import m3u8
from . import subtitles
from .error import UnexpectedHLSResponse
from .model import (
AudioMeta,
AudioTrack,
SubtitlesMeta,
SubtitlesTrack,
Target,
VideoMeta,
VideoTrack,
)
#
# WARNING !
#
# This module does not aim for a full implementation of HLS, only the
# subset useful for the actual observed usage of ArteTV.
#
# - URIs are relative file paths
# - Program indexes have at least one variant
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video indexes segments are incremental ranges of
# the same file
# - Subtitles indexes have only one segment
MIME_TYPE = "application/x-mpegURL"
def _fetch_index(http_session, url):
# Fetch a M3U8 playlist
r = http_session.get(url)
r.raise_for_status()
if (_ := r.headers["content-type"]) != MIME_TYPE:
raise UnexpectedHLSResponse("MIME_TYPE", url, MIME_TYPE, _)
r.encoding = "utf-8"
return m3u8.loads(r.text, url)
def fetch_program_tracks(http_session, program_index_url):
"""Fetch video, audio and subtitles tracks for the given program index."""
program_index = _fetch_index(http_session, program_index_url)
audio_media = None
subtitles_media = None
for media in program_index.media:
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedHLSResponse(
"MULTIPLE_AUDIO_MEDIA", program_index_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedHLSResponse(
"MULTIPLE_SUBTITLES_MEDIA", program_index_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url)
audio_track = AudioTrack(
AudioMeta(
audio_media.language,
audio_media.name.startswith("VO"),
(
audio_media.characteristics is not None
and ("public.accessibility" in audio_media.characteristics)
),
),
audio_media.absolute_uri,
)
subtitles_track = (
SubtitlesTrack(
SubtitlesMeta(
subtitles_media.language,
(
subtitles_media.characteristics is not None
and ("public.accessibility" in subtitles_media.characteristics)
),
),
subtitles_media.absolute_uri,
)
if subtitles_media
else None
)
video_tracks = set()
for video_media in program_index.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio
)
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
)
elif stream_info.subtitles:
raise UnexpectedHLSResponse(
"INVALID_SUBTITLES_MEDIA",
program_index_url,
stream_info.subtitles,
)
video_track = VideoTrack(
VideoMeta(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
),
video_media.absolute_uri,
)
if video_track in video_tracks:
raise UnexpectedHLSResponse(
"DUPLICATE_VIDEO_TRACK", program_index_url, video_track
)
video_tracks.add(video_track)
return video_tracks, audio_track, subtitles_track
def _convert_byterange(obj):
# Convert a M3U8 `byterange` (1) to an `http range` (2).
# 1. "count@offset"
# 2. (start, end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def _fetch_av_index(http_session, track_index_url):
# Fetch an audio or video track index.
# Return a tuple:
# - the media file url
# - the media file's ranges
track_index = _fetch_index(http_session, track_index_url)
file_name = track_index.segment_map[0].uri
start, end = _convert_byterange(track_index.segment_map[0])
if start != 0:
raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url)
ranges = [(start, end)]
next_start = end + 1
for segment in track_index.segments:
if segment.uri != file_name:
raise UnexpectedHLSResponse("MULTIPLE_AV_INDEX_FILES", track_index_url)
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedHLSResponse(
"DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url
)
ranges.append((start, end))
next_start = end + 1
return track_index.segment_map[0].absolute_uri, ranges
def _fetch_s_index(http_session, track_index_url):
# Fetch subtitles index.
# Return the subtitle file url.
track_index = _fetch_index(http_session, track_index_url)
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url)
if len(urls) > 1:
raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url)
return urls[0]
def _download_av_track(http_session, track_index_url, progress):
# Download an audio or video data to temporary file.
# Return the temporary file path.
url, ranges = _fetch_av_index(http_session, track_index_url)
total = ranges[-1][1]
with (
NamedTemporaryFile(
mode="w+b", delete=False, prefix="delarte.", suffix=".mp4"
) as f
):
for range_start, range_end in ranges:
r = http_session.get(
url,
headers={
"Range": f"bytes={range_start}-{range_end}",
},
timeout=5,
)
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedHLSResponse(
"UNEXPECTED_AV_TRACK_HTTP_STATUS",
track_index_url,
r.request.headers,
r.status,
)
if len(r.content) != range_end - range_start + 1:
raise UnexpectedHLSResponse(
"INVALID_AV_TRACK_FRAGMENT_LENGTH", track_index_url
)
f.write(r.content)
progress(range_end, total)
return f.name
def _download_s_track(http_session, track_index_url, progress):
# Download a subtitle file (converted from VTT to SRT format) into a temporary file.
# Return the temporary file path.
url = _fetch_s_index(http_session, track_index_url)
progress(0, 2)
r = http_session.get(url)
r.raise_for_status()
r.encoding = "utf-8"
progress(1, 2)
with NamedTemporaryFile(
"w", delete=False, prefix="delarte.", suffix=".srt", encoding="utf8"
) as f:
subtitles.convert(r.text, f)
progress(2, 2)
return f.name
@contextlib.contextmanager
def download_target_tracks(http_session, target, progress):
"""Download target tracks to temporary files.
Returns a context manager that will delete the temporary files on exit.
The context expression is a local version of the given target.
"""
v_path, (v_meta, v_url) = None, target.video_track
a_path, (a_meta, a_url) = None, target.audio_track
s_path, (s_meta, s_url) = None, target.subtitles_track or (None, None)
try:
s_path = (
_download_s_track(
http_session,
s_url,
lambda i, n: progress("subtitles", i, n),
)
if s_meta
else None
)
a_path = _download_av_track(
http_session, a_url, lambda i, n: progress("audio", i, n)
)
v_path = _download_av_track(
http_session, v_url, lambda i, n: progress("video", i, n)
)
yield Target(
target.program,
VideoTrack(v_meta, v_path),
AudioTrack(a_meta, a_path),
SubtitlesTrack(s_meta, s_path) if s_meta else None,
target.file_name,
)
finally:
if v_path and os.path.isfile(v_path):
os.unlink(v_path)
if a_path and os.path.isfile(a_path):
os.unlink(a_path)
if s_path and os.path.isfile(s_path):
os.unlink(s_path)