# License: GNU AGPL v3: http://www.gnu.org/licenses/ # This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git) """Provide HLS protocol utilities.""" import m3u8 from .error import UnexpectedHLSResponse, UnsupportedHLSProtocol, HTTPError from .model import AudioTrack, SubtitlesTrack, Variant, VideoTrack # # WARNING ! # # This module does not aim for a full implementation of HLS, only the # subset useful for the actual observed usage of ArteTV. # # - URIs are relative file paths # - Program indexes have at least one variant # - Every variant is of different resolution # - Every variant has exactly one audio medium # - Every variant has at most one subtitles medium # - Audio and video indexes segments are incremental ranges of # the same file # - Subtitles indexes have only one segment MIME_TYPE = "application/x-mpegURL" def _fetch_index(http, url): # Fetch a M3U8 playlist r = http.request("GET", url) if not 200 <= r.status < 300: raise HTTPError(r) if (_ := r.getheader("content-type")) != MIME_TYPE: raise UnexpectedHLSResponse("MIME_TYPE", url, MIME_TYPE, _) return m3u8.loads(r.data.decode("utf-8"), url) def iter_variants(protocol, program_index_url, http): """Iterate over variants for the given rendition.""" if protocol != "HLS_NG": raise UnsupportedHLSProtocol(protocol, program_index_url) program_index = _fetch_index(http, program_index_url) audio_media = None subtitles_media = None for media in program_index.media: match media.type: case "AUDIO": if audio_media: raise UnexpectedHLSResponse( "MULTIPLE_AUDIO_MEDIA", program_index_url ) audio_media = media case "SUBTITLES": if subtitles_media: raise UnexpectedHLSResponse( "MULTIPLE_SUBTITLES_MEDIA", program_index_url ) subtitles_media = media if not audio_media: raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", program_index_url) audio = ( AudioTrack( audio_media.name, audio_media.language, audio_media.name.startswith("VO"), ( audio_media.characteristics is not None and ("public.accessibility" in audio_media.characteristics) ), ), audio_media.absolute_uri, ) subtitles = ( ( SubtitlesTrack( subtitles_media.name, subtitles_media.language, ( subtitles_media.characteristics is not None and ("public.accessibility" in subtitles_media.characteristics) ), ), subtitles_media.absolute_uri, ) if subtitles_media else None ) codes = set() for video_media in program_index.playlists: stream_info = video_media.stream_info if stream_info.audio != audio_media.group_id: raise UnexpectedHLSResponse( "INVALID_AUDIO_MEDIA", program_index_url, stream_info.audio ) if subtitles_media: if stream_info.subtitles != subtitles_media.group_id: raise UnexpectedHLSResponse( "INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles ) elif stream_info.subtitles: raise UnexpectedHLSResponse( "INVALID_SUBTITLES_MEDIA", program_index_url, stream_info.subtitles ) code = f"{stream_info.resolution[1]}p" if code in codes: raise UnexpectedHLSResponse( "DUPLICATE_STREAM_CODE", program_index_url, code ) codes.add(code) yield ( Variant( code, stream_info.average_bandwidth, ), ( VideoTrack( stream_info.resolution[0], stream_info.resolution[1], stream_info.frame_rate, ), video_media.absolute_uri, ), audio, subtitles, ) if not codes: raise UnexpectedHLSResponse("NO_VARIANTS", program_index_url) def _convert_byterange(obj): # Convert a M3U8 `byterange` (1) to an `http range` (2). # 1. "count@offset" # 2. (start, end) count, offset = [int(v) for v in obj.byterange.split("@")] return offset, offset + count - 1 def fetch_mp4_media(track_index_url, http): """Fetch an audio or video media.""" track_index = _fetch_index(http, track_index_url) file_name = track_index.segment_map[0].uri start, end = _convert_byterange(track_index.segment_map[0]) if start != 0: raise UnexpectedHLSResponse("INVALID_AV_INDEX_FRAGMENT_START", track_index_url) # ranges = [(start, end)] next_start = end + 1 for segment in track_index.segments: if segment.uri != file_name: raise UnexpectedHLSResponse("MULTIPLE_AV_INDEX_FILES", track_index_url) start, end = _convert_byterange(segment) if start != next_start: raise UnexpectedHLSResponse( "DISCONTINUOUS_AV_INDEX_FRAGMENT", track_index_url ) # ranges.append((start, end)) next_start = end + 1 return track_index.segment_map[0].absolute_uri def fetch_vtt_media(track_index_url, http): """Fetch an audio or video media.""" track_index = _fetch_index(http, track_index_url) urls = [s.absolute_uri for s in track_index.segments] if not urls: raise UnexpectedHLSResponse("NO_S_INDEX_FILES", track_index_url) if len(urls) > 1: raise UnexpectedHLSResponse("MULTIPLE_S_INDEX_FILES", track_index_url) return urls[0]