From 81913a6f2459159c60b82a7b4ad6e5e0863e8b21 Mon Sep 17 00:00:00 2001 From: Barbagus Date: Sun, 8 Jan 2023 20:04:18 +0100 Subject: [PATCH] Cleanup package API #20 Move all error definitions to `error` module In `__init__` - Remove imports from global scope - Import all from `model` module - Import all from `error` module Refactor: `fetch_sources()` to take the URL as argument Coding style: import definitions from `error` and `model` --- src/delarte/__init__.py | 37 +++++++++++++++-------------- src/delarte/__main__.py | 14 +++++------ src/delarte/api.py | 36 ++++++++--------------------- src/delarte/error.py | 22 +++++++++++++++++- src/delarte/hls.py | 50 ++++++++++++++++++++-------------------- src/delarte/subtitles.py | 12 ++++------ src/delarte/www.py | 16 +++++-------- 7 files changed, 91 insertions(+), 96 deletions(-) diff --git a/src/delarte/__init__.py b/src/delarte/__init__.py index 199c34d..70a4570 100644 --- a/src/delarte/__init__.py +++ b/src/delarte/__init__.py @@ -5,32 +5,31 @@ __version__ = "0.1" -from . import api, hls, muxing -from .model import ( - Metadata, - Rendition, - RenditionAudio, - RenditionSubtitles, - Source, - Variant, -) -from .www import parse_url as parse_web_url +from .error import * +from .model import * -def fetch_sources(http_session, target_id, www_lang): - """Fetch a target's sources.""" +def fetch_sources(http_session, url): + """Fetch a url's sources.""" + from .api import fetch_program_info + from .hls import fetch_program_sources + from .www import parse_url + + site, target_id = parse_url(url) + return [ source - for metadata, master_playlist_url in api.fetch_program_info( - http_session, target_id, www_lang - ) - for source in hls.fetch_program_sources( - http_session, metadata, master_playlist_url + for metadata, master_playlist_url in fetch_program_info( + http_session, site, target_id ) + for source in fetch_program_sources(http_session, metadata, master_playlist_url) ] def download_source(http_session, source, file_name, progress): """Download the given source into given file.""" - with hls.download_source(http_session, source, progress) as local_source: - muxing.mux_source(local_source, file_name, progress) + from .hls import download_source + from .muxing import mux_source + + with download_source(http_session, source, progress) as local_source: + mux_source(local_source, file_name, progress) diff --git a/src/delarte/__main__.py b/src/delarte/__main__.py index 3b89676..c9a0c3d 100644 --- a/src/delarte/__main__.py +++ b/src/delarte/__main__.py @@ -30,14 +30,15 @@ import time import docopt import requests -from . import __version__, download_source, error, fetch_sources, naming, www +from . import __version__, download_source, fetch_sources, naming +from .error import ModuleError, UnexpectedError -class Abort(error.ModuleError): +class Abort(ModuleError): """Aborted.""" -class Fail(error.UnexpectedError): +class Fail(UnexpectedError): """Unexpected error.""" @@ -154,11 +155,10 @@ def main(): args = docopt.docopt(__doc__, sys.argv[1:], version=__version__) try: - target_id, www_lang = www.parse_url(args["URL"]) http_session = requests.sessions.Session() - sources = fetch_sources(http_session, target_id, www_lang) + sources = fetch_sources(http_session, args["URL"]) if not args["RENDITION"]: print(f"Available renditions:") @@ -183,7 +183,7 @@ def main(): for source, file_name in zip(sources, file_names): download_source(http_session, source, file_name, progress) - except error.UnexpectedError as e: + except UnexpectedError as e: print(str(e)) print() print( @@ -200,7 +200,7 @@ def main(): print(f" {repr(e)}") return 1 - except error.ModuleError as e: + except ModuleError as e: print(str(e)) if args["--debug"]: print(repr(e)) diff --git a/src/delarte/api.py b/src/delarte/api.py index 88b1bbe..b65dc4f 100644 --- a/src/delarte/api.py +++ b/src/delarte/api.py @@ -3,51 +3,35 @@ """Provide ArteTV JSON API utilities.""" -from . import error, model +from .error import UnexpectedAPIResponse, UnsupportedHLSProtocol +from .model import Metadata MIME_TYPE = "application/vnd.api+json; charset=utf-8" -class UnexpectedResponse(error.UnexpectedError): - """Unexpected response from ArteTV.""" - - -class NotFound(error.ModuleError): - """Program not found on ArteTV.""" - - -class UnsupportedProtocol(error.ModuleError): - """Program type not supported.""" - - def _fetch_api_data(http_session, path, object_type): # Fetch an API object. url = "https://api.arte.tv/api/player/v2/" + path r = http_session.get(url) - if r.status_code == 404: - raise NotFound(url) - r.raise_for_status() if (_ := r.headers["content-type"]) != MIME_TYPE: - raise UnexpectedResponse("MIME_TYPE", path, MIME_TYPE, _) + raise UnexpectedAPIResponse("MIME_TYPE", path, MIME_TYPE, _) obj = r.json()["data"] if (_ := obj["type"]) != object_type: - raise UnexpectedResponse("OBJECT_TYPE", path, object_type, _) + raise UnexpectedAPIResponse("OBJECT_TYPE", path, object_type, _) return obj["attributes"] -def fetch_program_info(http_session, target_id, www_lang): +def fetch_program_info(http_session, site, target_id): """Fetch the given target's associated program information.""" - obj = _fetch_api_data( - http_session, f"config/{www_lang}/{target_id}", "ConfigPlayer" - ) + obj = _fetch_api_data(http_session, f"config/{site}/{target_id}", "ConfigPlayer") - metadata = model.Metadata( + metadata = Metadata( obj["metadata"]["providerId"], obj["metadata"]["title"], obj["metadata"]["subtitle"], @@ -59,13 +43,13 @@ def fetch_program_info(http_session, target_id, www_lang): for s in obj["streams"]: if (_ := s["protocol"]) != "HLS_NG": - raise UnsupportedProtocol(target_id, www_lang, _) + raise UnsupportedHLSProtocol(site, target_id, _) if (master_playlist_url := s["url"]) in cache: - raise UnexpectedResponse( + raise UnexpectedAPIResponse( "DUPLICATE_MASTER_PLAYLIST_URL", + site, target_id, - www_lang, master_playlist_url, ) diff --git a/src/delarte/error.py b/src/delarte/error.py index 93d70f4..4cca157 100644 --- a/src/delarte/error.py +++ b/src/delarte/error.py @@ -13,8 +13,28 @@ class ModuleError(Exception): def __repr__(self): """Use the class qualified name and constructor arguments.""" - return f"{self.__class__.__qualname__}{self.args!r}" + return f"{self.__class__}{self.args!r}" class UnexpectedError(ModuleError): """An error to report to developers.""" + + +class InvalidUrl(ModuleError): + """Invalid ArteTV URL.""" + + +class UnexpectedAPIResponse(UnexpectedError): + """Unexpected response from ArteTV.""" + + +class UnexpectedHLSResponse(UnexpectedError): + """Unexpected response from ArteTV.""" + + +class UnsupportedHLSProtocol(ModuleError): + """Program type not supported.""" + + +class WebVTTError(UnexpectedError): + """Unexpected WebVTT data.""" diff --git a/src/delarte/hls.py b/src/delarte/hls.py index c4f01b5..274dae8 100644 --- a/src/delarte/hls.py +++ b/src/delarte/hls.py @@ -63,7 +63,9 @@ from tempfile import NamedTemporaryFile import m3u8 -from . import error, model, subtitles +from . import subtitles +from .error import UnexpectedHLSResponse +from .model import Rendition, RenditionAudio, RenditionSubtitles, Source, Variant # # WARNING ! @@ -81,10 +83,6 @@ from . import error, model, subtitles # - Subtitles media playlists have only one segment -class UnexpectedResponse(error.UnexpectedError): - """Unexpected response from ArteTV.""" - - def _fetch_playlist(http_session, url): # Fetch a M3U8 playlist r = http_session.get(url) @@ -103,22 +101,22 @@ def fetch_program_sources(http_session, metadata, master_playlist_url): match media.type: case "AUDIO": if audio_media: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "MULTIPLE_AUDIO_MEDIA", master_playlist_url ) audio_media = media case "SUBTITLES": if subtitles_media: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "MULTIPLE_SUBTITLES_MEDIA", master_playlist_url ) subtitles_media = media if not audio_media: - raise UnexpectedResponse("NO_AUDIO_MEDIA", master_playlist_url) + raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", master_playlist_url) - rendition = model.Rendition( - model.RenditionAudio( + rendition = Rendition( + RenditionAudio( audio_media.language, audio_media.name.startswith("VO"), ( @@ -126,7 +124,7 @@ def fetch_program_sources(http_session, metadata, master_playlist_url): and ("public.accessibility" in audio_media.characteristics) ), ), - model.RenditionSubtitles( + RenditionSubtitles( subtitles_media.language, ( subtitles_media.characteristics is not None @@ -142,35 +140,37 @@ def fetch_program_sources(http_session, metadata, master_playlist_url): for video_media in master_playlist.playlists: stream_info = video_media.stream_info if stream_info.audio != audio_media.group_id: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "INVALID_VARIANT_AUDIO_MEDIA", master_playlist_url, stream_info.audio ) if subtitles_media: if stream_info.subtitles != subtitles_media.group_id: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "INVALID_VARIANT_SUBTITLES_MEDIA", master_playlist_url, stream_info.subtitles, ) elif stream_info.subtitles: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "INVALID_VARIANT_SUBTITLES_MEDIA", master_playlist_url, stream_info.subtitles, ) - variant = model.Variant( + variant = Variant( stream_info.resolution[0], stream_info.resolution[1], stream_info.frame_rate, ) if variant in cache: - raise UnexpectedResponse("DUPLICATE_VARIANT", master_playlist_url, variant) + raise UnexpectedHLSResponse( + "DUPLICATE_VARIANT", master_playlist_url, variant + ) cache.add(variant) - yield model.Source( + yield Source( metadata, rendition, variant, @@ -198,17 +198,17 @@ def _fetch_av_media_playlist(http_session, url): file_name = media_playlist.segment_map[0].uri start, end = _convert_byterange(media_playlist.segment_map[0]) if start != 0: - raise UnexpectedResponse("INVALID_AV_MEDIA_FRAGMENT_START", url) + raise UnexpectedHLSResponse("INVALID_AV_MEDIA_FRAGMENT_START", url) ranges = [(start, end)] next_start = end + 1 for segment in media_playlist.segments: if segment.uri != file_name: - raise UnexpectedResponse("MULTIPLE_AV_MEDIA_FILES", url) + raise UnexpectedHLSResponse("MULTIPLE_AV_MEDIA_FILES", url) start, end = _convert_byterange(segment) if start != next_start: - raise UnexpectedResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url) + raise UnexpectedHLSResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url) ranges.append((start, end)) next_start = end + 1 @@ -223,10 +223,10 @@ def _fetch_subtitles_media_playlist(http_session, url): urls = [s.absolute_uri for s in subtitles_index.segments] if not urls: - raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", url) + raise UnexpectedHLSResponse("SUBTITLES_MEDIA_NO_FILES", url) if len(urls) > 1: - raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url) + raise UnexpectedHLSResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url) return urls[0] @@ -254,7 +254,7 @@ def _download_av_media(http_session, media_playlist_url, progress): r.raise_for_status() if r.status_code != 206: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "UNEXPECTED_AV_MEDIA_HTTP_STATUS", media_playlist_url, r.request.headers, @@ -262,7 +262,7 @@ def _download_av_media(http_session, media_playlist_url, progress): ) if len(r.content) != range_end - range_start + 1: - raise UnexpectedResponse( + raise UnexpectedHLSResponse( "INVALID_AV_MEDIA_FRAGMENT_LENGTH", media_playlist_url ) f.write(r.content) @@ -321,7 +321,7 @@ def download_source(http_session, source, progress): http_session, source.audio, lambda i, n: progress("audio", i, n) ) - yield model.Source( + yield Source( source.metadata, source.rendition, source.variant, diff --git a/src/delarte/subtitles.py b/src/delarte/subtitles.py index 1cec37a..61024be 100644 --- a/src/delarte/subtitles.py +++ b/src/delarte/subtitles.py @@ -5,11 +5,7 @@ import re -from . import error - - -class Error(error.UnexpectedError): - """Unexpected WebVTT data.""" +from .error import WebVTTError RE_CUE_START = r"^((?:\d\d:)\d\d:\d\d)\.(\d\d\d) --> ((?:\d\d:)\d\d:\d\d)\.(\d\d\d)" @@ -35,11 +31,11 @@ def convert(input, output): block = [] if not blocks: - raise Error("INVALID_DATA") + raise WebVTTError("INVALID_DATA") header = blocks.pop(0) if not (len(header) == 1 and header[0].startswith("WEBVTT")): - raise Error("INVALID_HEADER") + raise WebVTTError("INVALID_HEADER") counter = 1 for block in blocks: @@ -55,4 +51,4 @@ def convert(input, output): counter += 1 if counter == 1: - raise Error("EMPTY_DATA") + raise WebVTTError("EMPTY_DATA") diff --git a/src/delarte/www.py b/src/delarte/www.py index a85f7e9..b667620 100644 --- a/src/delarte/www.py +++ b/src/delarte/www.py @@ -3,14 +3,10 @@ """Provide ArteTV website utilities.""" -from . import error +from .error import InvalidUrl BASE = "https://www.arte.tv/" -LANGUAGES = ["fr", "de", "en", "es", "pl", "it"] - - -class InvalidUrl(error.ModuleError): - """Invalid ArteTV URL.""" +SITES = ["fr", "de", "en", "es", "pl", "it"] def parse_url(url): @@ -20,14 +16,14 @@ def parse_url(url): path = url[len(BASE) :].split("/") - www_lang = path.pop(0) + site = path.pop(0) - if www_lang not in LANGUAGES: - raise InvalidUrl("WWW_LANG", url, www_lang) + if site not in SITES: + raise InvalidUrl("SITE", url, site) if (_ := path.pop(0)) != "videos": raise InvalidUrl("PATH", url, _) target_id = path.pop(0) - return target_id, www_lang + return site, target_id