Cleanup package API #20

Move all error definitions to `error` module
In `__init__`
  - Remove imports from global scope
  - Import all from `model` module
  - Import all from `error` module
Refactor: `fetch_sources()` to take the URL as argument
Coding style: import definitions from `error` and `model`
This commit is contained in:
Barbagus 2023-01-08 20:04:18 +01:00
parent aa6a6e4a30
commit 81913a6f24
7 changed files with 91 additions and 96 deletions

View File

@ -5,32 +5,31 @@
__version__ = "0.1"
from . import api, hls, muxing
from .model import (
Metadata,
Rendition,
RenditionAudio,
RenditionSubtitles,
Source,
Variant,
)
from .www import parse_url as parse_web_url
from .error import *
from .model import *
def fetch_sources(http_session, target_id, www_lang):
"""Fetch a target's sources."""
def fetch_sources(http_session, url):
"""Fetch a url's sources."""
from .api import fetch_program_info
from .hls import fetch_program_sources
from .www import parse_url
site, target_id = parse_url(url)
return [
source
for metadata, master_playlist_url in api.fetch_program_info(
http_session, target_id, www_lang
)
for source in hls.fetch_program_sources(
http_session, metadata, master_playlist_url
for metadata, master_playlist_url in fetch_program_info(
http_session, site, target_id
)
for source in fetch_program_sources(http_session, metadata, master_playlist_url)
]
def download_source(http_session, source, file_name, progress):
"""Download the given source into given file."""
with hls.download_source(http_session, source, progress) as local_source:
muxing.mux_source(local_source, file_name, progress)
from .hls import download_source
from .muxing import mux_source
with download_source(http_session, source, progress) as local_source:
mux_source(local_source, file_name, progress)

View File

@ -30,14 +30,15 @@ import time
import docopt
import requests
from . import __version__, download_source, error, fetch_sources, naming, www
from . import __version__, download_source, fetch_sources, naming
from .error import ModuleError, UnexpectedError
class Abort(error.ModuleError):
class Abort(ModuleError):
"""Aborted."""
class Fail(error.UnexpectedError):
class Fail(UnexpectedError):
"""Unexpected error."""
@ -154,11 +155,10 @@ def main():
args = docopt.docopt(__doc__, sys.argv[1:], version=__version__)
try:
target_id, www_lang = www.parse_url(args["URL"])
http_session = requests.sessions.Session()
sources = fetch_sources(http_session, target_id, www_lang)
sources = fetch_sources(http_session, args["URL"])
if not args["RENDITION"]:
print(f"Available renditions:")
@ -183,7 +183,7 @@ def main():
for source, file_name in zip(sources, file_names):
download_source(http_session, source, file_name, progress)
except error.UnexpectedError as e:
except UnexpectedError as e:
print(str(e))
print()
print(
@ -200,7 +200,7 @@ def main():
print(f" {repr(e)}")
return 1
except error.ModuleError as e:
except ModuleError as e:
print(str(e))
if args["--debug"]:
print(repr(e))

View File

@ -3,51 +3,35 @@
"""Provide ArteTV JSON API utilities."""
from . import error, model
from .error import UnexpectedAPIResponse, UnsupportedHLSProtocol
from .model import Metadata
MIME_TYPE = "application/vnd.api+json; charset=utf-8"
class UnexpectedResponse(error.UnexpectedError):
"""Unexpected response from ArteTV."""
class NotFound(error.ModuleError):
"""Program not found on ArteTV."""
class UnsupportedProtocol(error.ModuleError):
"""Program type not supported."""
def _fetch_api_data(http_session, path, object_type):
# Fetch an API object.
url = "https://api.arte.tv/api/player/v2/" + path
r = http_session.get(url)
if r.status_code == 404:
raise NotFound(url)
r.raise_for_status()
if (_ := r.headers["content-type"]) != MIME_TYPE:
raise UnexpectedResponse("MIME_TYPE", path, MIME_TYPE, _)
raise UnexpectedAPIResponse("MIME_TYPE", path, MIME_TYPE, _)
obj = r.json()["data"]
if (_ := obj["type"]) != object_type:
raise UnexpectedResponse("OBJECT_TYPE", path, object_type, _)
raise UnexpectedAPIResponse("OBJECT_TYPE", path, object_type, _)
return obj["attributes"]
def fetch_program_info(http_session, target_id, www_lang):
def fetch_program_info(http_session, site, target_id):
"""Fetch the given target's associated program information."""
obj = _fetch_api_data(
http_session, f"config/{www_lang}/{target_id}", "ConfigPlayer"
)
obj = _fetch_api_data(http_session, f"config/{site}/{target_id}", "ConfigPlayer")
metadata = model.Metadata(
metadata = Metadata(
obj["metadata"]["providerId"],
obj["metadata"]["title"],
obj["metadata"]["subtitle"],
@ -59,13 +43,13 @@ def fetch_program_info(http_session, target_id, www_lang):
for s in obj["streams"]:
if (_ := s["protocol"]) != "HLS_NG":
raise UnsupportedProtocol(target_id, www_lang, _)
raise UnsupportedHLSProtocol(site, target_id, _)
if (master_playlist_url := s["url"]) in cache:
raise UnexpectedResponse(
raise UnexpectedAPIResponse(
"DUPLICATE_MASTER_PLAYLIST_URL",
site,
target_id,
www_lang,
master_playlist_url,
)

View File

@ -13,8 +13,28 @@ class ModuleError(Exception):
def __repr__(self):
"""Use the class qualified name and constructor arguments."""
return f"{self.__class__.__qualname__}{self.args!r}"
return f"{self.__class__}{self.args!r}"
class UnexpectedError(ModuleError):
"""An error to report to developers."""
class InvalidUrl(ModuleError):
"""Invalid ArteTV URL."""
class UnexpectedAPIResponse(UnexpectedError):
"""Unexpected response from ArteTV."""
class UnexpectedHLSResponse(UnexpectedError):
"""Unexpected response from ArteTV."""
class UnsupportedHLSProtocol(ModuleError):
"""Program type not supported."""
class WebVTTError(UnexpectedError):
"""Unexpected WebVTT data."""

View File

@ -63,7 +63,9 @@ from tempfile import NamedTemporaryFile
import m3u8
from . import error, model, subtitles
from . import subtitles
from .error import UnexpectedHLSResponse
from .model import Rendition, RenditionAudio, RenditionSubtitles, Source, Variant
#
# WARNING !
@ -81,10 +83,6 @@ from . import error, model, subtitles
# - Subtitles media playlists have only one segment
class UnexpectedResponse(error.UnexpectedError):
"""Unexpected response from ArteTV."""
def _fetch_playlist(http_session, url):
# Fetch a M3U8 playlist
r = http_session.get(url)
@ -103,22 +101,22 @@ def fetch_program_sources(http_session, metadata, master_playlist_url):
match media.type:
case "AUDIO":
if audio_media:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"MULTIPLE_AUDIO_MEDIA", master_playlist_url
)
audio_media = media
case "SUBTITLES":
if subtitles_media:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"MULTIPLE_SUBTITLES_MEDIA", master_playlist_url
)
subtitles_media = media
if not audio_media:
raise UnexpectedResponse("NO_AUDIO_MEDIA", master_playlist_url)
raise UnexpectedHLSResponse("NO_AUDIO_MEDIA", master_playlist_url)
rendition = model.Rendition(
model.RenditionAudio(
rendition = Rendition(
RenditionAudio(
audio_media.language,
audio_media.name.startswith("VO"),
(
@ -126,7 +124,7 @@ def fetch_program_sources(http_session, metadata, master_playlist_url):
and ("public.accessibility" in audio_media.characteristics)
),
),
model.RenditionSubtitles(
RenditionSubtitles(
subtitles_media.language,
(
subtitles_media.characteristics is not None
@ -142,35 +140,37 @@ def fetch_program_sources(http_session, metadata, master_playlist_url):
for video_media in master_playlist.playlists:
stream_info = video_media.stream_info
if stream_info.audio != audio_media.group_id:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"INVALID_VARIANT_AUDIO_MEDIA", master_playlist_url, stream_info.audio
)
if subtitles_media:
if stream_info.subtitles != subtitles_media.group_id:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
elif stream_info.subtitles:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"INVALID_VARIANT_SUBTITLES_MEDIA",
master_playlist_url,
stream_info.subtitles,
)
variant = model.Variant(
variant = Variant(
stream_info.resolution[0],
stream_info.resolution[1],
stream_info.frame_rate,
)
if variant in cache:
raise UnexpectedResponse("DUPLICATE_VARIANT", master_playlist_url, variant)
raise UnexpectedHLSResponse(
"DUPLICATE_VARIANT", master_playlist_url, variant
)
cache.add(variant)
yield model.Source(
yield Source(
metadata,
rendition,
variant,
@ -198,17 +198,17 @@ def _fetch_av_media_playlist(http_session, url):
file_name = media_playlist.segment_map[0].uri
start, end = _convert_byterange(media_playlist.segment_map[0])
if start != 0:
raise UnexpectedResponse("INVALID_AV_MEDIA_FRAGMENT_START", url)
raise UnexpectedHLSResponse("INVALID_AV_MEDIA_FRAGMENT_START", url)
ranges = [(start, end)]
next_start = end + 1
for segment in media_playlist.segments:
if segment.uri != file_name:
raise UnexpectedResponse("MULTIPLE_AV_MEDIA_FILES", url)
raise UnexpectedHLSResponse("MULTIPLE_AV_MEDIA_FILES", url)
start, end = _convert_byterange(segment)
if start != next_start:
raise UnexpectedResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url)
raise UnexpectedHLSResponse("DISCONTINUOUS_AV_MEDIA_FRAGMENT", url)
ranges.append((start, end))
next_start = end + 1
@ -223,10 +223,10 @@ def _fetch_subtitles_media_playlist(http_session, url):
urls = [s.absolute_uri for s in subtitles_index.segments]
if not urls:
raise UnexpectedResponse("SUBTITLES_MEDIA_NO_FILES", url)
raise UnexpectedHLSResponse("SUBTITLES_MEDIA_NO_FILES", url)
if len(urls) > 1:
raise UnexpectedResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url)
raise UnexpectedHLSResponse("SUBTITLES_MEDIA_MULTIPLE_FILES", url)
return urls[0]
@ -254,7 +254,7 @@ def _download_av_media(http_session, media_playlist_url, progress):
r.raise_for_status()
if r.status_code != 206:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"UNEXPECTED_AV_MEDIA_HTTP_STATUS",
media_playlist_url,
r.request.headers,
@ -262,7 +262,7 @@ def _download_av_media(http_session, media_playlist_url, progress):
)
if len(r.content) != range_end - range_start + 1:
raise UnexpectedResponse(
raise UnexpectedHLSResponse(
"INVALID_AV_MEDIA_FRAGMENT_LENGTH", media_playlist_url
)
f.write(r.content)
@ -321,7 +321,7 @@ def download_source(http_session, source, progress):
http_session, source.audio, lambda i, n: progress("audio", i, n)
)
yield model.Source(
yield Source(
source.metadata,
source.rendition,
source.variant,

View File

@ -5,11 +5,7 @@
import re
from . import error
class Error(error.UnexpectedError):
"""Unexpected WebVTT data."""
from .error import WebVTTError
RE_CUE_START = r"^((?:\d\d:)\d\d:\d\d)\.(\d\d\d) --> ((?:\d\d:)\d\d:\d\d)\.(\d\d\d)"
@ -35,11 +31,11 @@ def convert(input, output):
block = []
if not blocks:
raise Error("INVALID_DATA")
raise WebVTTError("INVALID_DATA")
header = blocks.pop(0)
if not (len(header) == 1 and header[0].startswith("WEBVTT")):
raise Error("INVALID_HEADER")
raise WebVTTError("INVALID_HEADER")
counter = 1
for block in blocks:
@ -55,4 +51,4 @@ def convert(input, output):
counter += 1
if counter == 1:
raise Error("EMPTY_DATA")
raise WebVTTError("EMPTY_DATA")

View File

@ -3,14 +3,10 @@
"""Provide ArteTV website utilities."""
from . import error
from .error import InvalidUrl
BASE = "https://www.arte.tv/"
LANGUAGES = ["fr", "de", "en", "es", "pl", "it"]
class InvalidUrl(error.ModuleError):
"""Invalid ArteTV URL."""
SITES = ["fr", "de", "en", "es", "pl", "it"]
def parse_url(url):
@ -20,14 +16,14 @@ def parse_url(url):
path = url[len(BASE) :].split("/")
www_lang = path.pop(0)
site = path.pop(0)
if www_lang not in LANGUAGES:
raise InvalidUrl("WWW_LANG", url, www_lang)
if site not in SITES:
raise InvalidUrl("SITE", url, site)
if (_ := path.pop(0)) != "videos":
raise InvalidUrl("PATH", url, _)
target_id = path.pop(0)
return target_id, www_lang
return site, target_id