delarte/src/delarte/hls.py

168 lines
4.7 KiB
Python

# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""Provide HLS protocol utilities."""
import m3u8
from .error import HTTPError, UnexpectedHLSResponse
from .model import Rendition, Variant
#
# WARNING !
#
# This module does not aim for a full implementation of HLS, only the
# subset useful for the actual observed usage of ArteTV.
#
# - URIs are relative file paths
# - Program indexes have at least one variant
# - Every variant is of different resolution
# - Every variant has exactly one audio medium
# - Every variant has at most one subtitles medium
# - Audio and video indexes segments are incremental ranges of
# the same file
# - Subtitles indexes have only one segment
MIME_TYPE = "application/x-mpegURL"
def fetch_index(http, url):
"""Fetch a M3U8 playlist."""
r = http.request("GET", url)
HTTPError.raise_for_status(r)
if (_ := r.getheader("content-type")) != MIME_TYPE:
raise UnexpectedHLSResponse("MIME_TYPE", url, MIME_TYPE, _)
return m3u8.loads(r.data.decode("utf-8"), url)
def _select_program_audio(program_index):
all = [
(
media.group_id,
Rendition.Audio(
media.language,
media.name.startswith("VO"),
(
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
media.absolute_uri,
)
for media in program_index.media
if media.type == "AUDIO"
]
if not all:
raise UnexpectedHLSResponse("NO_PROGRAM_AUDIO_MEDIA")
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_AUDIO_MEDIA")
return all[0]
def _select_program_subtitles(program_index):
all = [
(
media.group_id,
Rendition.Subtitles(
media.language,
(
media.characteristics is not None
and ("public.accessibility" in media.characteristics)
),
),
media.absolute_uri,
)
for media in program_index.media
if media.type == "SUBTITLES"
]
if not all:
return None, None, None
if len(all) > 1:
raise UnexpectedHLSResponse("MULTIPLE_PROGRAM_SUBTITLES_MEDIA")
return all[0]
def read_ng_program_index(program_index):
"""Return variants, audio and subtitles from program index."""
audio_id, audio, audio_url = _select_program_audio(program_index)
subtitles_id, subtitles, subtitles_url = _select_program_subtitles(program_index)
variants = []
for video_media in program_index.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_id:
raise UnexpectedHLSResponse("PROGRAM_AUDIO_MEDIA_ID")
if stream_info.subtitles != subtitles_id:
raise UnexpectedHLSResponse("PROGRAM_SUBTITLES_MEDIA_ID")
variants.append(
(
Variant(
stream_info.average_bandwidth,
Variant.Video(
stream_info.resolution,
stream_info.frame_rate,
),
),
video_media.absolute_uri,
)
)
return variants, (audio, audio_url), (subtitles, subtitles_url)
def _convert_byterange(obj):
# Convert a M3U8 `byterange` (1) to an `http range` (2).
# 1. "count@offset"
# 2. (start, end)
count, offset = [int(v) for v in obj.byterange.split("@")]
return offset, offset + count - 1
def read_mp4_index(track_index):
"""Return MP4 file url and ranges."""
file_name = track_index.segment_map[0].uri
start, end = _convert_byterange(track_index.segment_map[0])
ranges = [(start, end)]
if start != 0:
raise UnexpectedHLSResponse("MP4_INDEX_FRAGMENT_START")
next_start = end + 1
for segment in track_index.segments:
if segment.uri != file_name:
raise UnexpectedHLSResponse("MULTIPLE_MP4_INDEX_FILES")
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedHLSResponse("DISCONTINUOUS_MP4_INDEX_FRAGMENT")
ranges.append((start, end))
next_start = end + 1
return track_index.segment_map[0].absolute_uri, ranges
def read_vtt_index(track_index):
"""Return VTT file url."""
urls = [s.absolute_uri for s in track_index.segments]
if not urls:
raise UnexpectedHLSResponse("NO_VTT_INDEX_FILES")
if len(urls) > 1:
raise UnexpectedHLSResponse("MULTIPLE_VTT_INDEX_FILES")
return urls[0]