# License: GNU AGPL v3: http://www.gnu.org/licenses/ # This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git) """Provide ArteTV JSON API utilities.""" import json from .error import UnexpectedAPIResponse, HTTPError from .model import Rendition MIME_TYPE = "application/vnd.api+json; charset=utf-8" def _fetch_api_object(http, url, object_type): # Fetch an API object. r = http.request("GET", url) HTTPError.raise_for_status(r) mime_type = r.getheader("content-type") if mime_type != MIME_TYPE: raise UnexpectedAPIResponse("MIME_TYPE", url, MIME_TYPE, mime_type) obj = json.loads(r.data.decode("utf-8")) try: data_type = obj["data"]["type"] if data_type != object_type: raise UnexpectedAPIResponse("OBJECT_TYPE", url, object_type, data_type) return obj["data"]["attributes"] except (KeyError, IndexError, ValueError) as e: raise UnexpectedAPIResponse("SCHEMA", url) from e def iter_renditions(program_id, player_config_url, http): """Iterate over renditions for the given program.""" obj = _fetch_api_object(http, player_config_url, "ConfigPlayer") codes = set() try: provider_id = obj["metadata"]["providerId"] if provider_id != program_id: raise UnexpectedAPIResponse( "PROVIDER_ID_MISMATCH", player_config_url, provider_id ) for s in obj["streams"]: code = s["versions"][0]["eStat"]["ml5"] if code in codes: raise UnexpectedAPIResponse( "DUPLICATE_RENDITION_CODE", player_config_url, code ) codes.add(code) yield ( Rendition( s["versions"][0]["eStat"]["ml5"], s["versions"][0]["label"], ), s["protocol"], s["url"], ) except (KeyError, IndexError, ValueError) as e: raise UnexpectedAPIResponse("SCHEMA", player_config_url) from e if not codes: raise UnexpectedAPIResponse("NO_RENDITIONS", player_config_url)