delarte/src/delarte/__main__.py

218 lines
5.9 KiB
Python

# License: GNU AGPL v3: http://www.gnu.org/licenses/
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
"""delarte - ArteTV downloader.
Usage:
delarte (-h | --help)
delarte --version
delarte [options] URL
delarte [options] URL RENDITION
delarte [options] URL RENDITION VARIANT
Download a video from ArteTV streaming service. Omit RENDITION and/or
VARIANT to print the list of available values.
Arguments:
URL the URL from ArteTV website
RENDITION the rendition code [audio/subtitles language combination]
VARIANT the variant code [video quality version]
Options:
-h --help print this message
--version print current version of the program
--debug on error, print debugging information
"""
import sys
import time
import docopt
import requests
from . import __version__, download_source, error, fetch_sources, naming, www
class Abort(error.ModuleError):
"""Aborted."""
class Fail(error.UnexpectedError):
"""Unexpected error."""
def _lookup_language_name(code):
# TODO: actually implement this
return f"[{code}]"
def _build_rendition_label(rendition):
# Build a human readable description for a given rendition
label = ""
if rendition.audio.is_original:
label += "original "
elif not rendition.audio.provides_accessibility:
label += "dubbed "
label += _lookup_language_name(rendition.audio.lang)
if rendition.audio.provides_accessibility:
label += " audio description"
if rendition.subtitles:
if rendition.subtitles.provides_accessibility:
if rendition.subtitles.lang != rendition.audio.lang:
label += f" with {_lookup_language_name(rendition.subtitles.lang)} text description"
else:
label += " with text description"
elif rendition.subtitles.lang != rendition.audio.lang:
label += (
f" with {_lookup_language_name(rendition.subtitles.lang)} subtitles"
)
return label
def _print_renditions(sources):
items = [
(r.code, _build_rendition_label(r)) for r in set(s.rendition for s in sources)
]
items.sort(key=lambda t: t[1])
for code, label in items:
print(f"\t{code:>6} - {label}")
def _print_variants(sources):
items = [(v.code, v.height) for v in set(s.variant for s in sources)]
items.sort(key=lambda t: t[1], reverse=True)
for code, _ in items:
print(f"\t{code}")
def _select_rendition(sources, rendition_code):
filtered = [s for s in sources if s.rendition.code == rendition_code]
if not filtered:
print(f"{rendition_code!r} is not a valid rendition code, possible values are:")
_print_renditions(sources)
raise Abort()
if len(_ := set(s.rendition for s in filtered)) > 1:
raise Fail("DUPLICATE_RENDITION_CODE", _)
return filtered
def _select_variant(sources, variant_code):
filtered = [s for s in sources if s.variant.code == variant_code]
if not filtered:
print(f"{variant_code!r} is not a valid variant code, possible values are:")
_print_variants(sources)
raise Abort()
if len(_ := set(s.variant for s in filtered)) > 1:
raise Fail("DUPLICATE_VARIANT_CODE", _)
return filtered
def create_progress():
"""Create a progress handler for input downloads."""
state = {
"last_update_time": 0,
"last_channel": None,
}
def progress(channel, current, total):
now = time.time()
if current == total:
print(f"\rDownloading {channel}: 100.0%")
state["last_update_time"] = now
elif channel != state["last_channel"]:
print(f"Downloading {channel}: 0.0%", end="")
state["last_update_time"] = now
state["last_channel"] = channel
elif now - state["last_update_time"] > 1:
print(
f"\rDownloading {channel}: {int(1000.0 * current / total) / 10.0}%",
end="",
)
state["last_update_time"] = now
return progress
def main():
"""CLI command."""
args = docopt.docopt(__doc__, sys.argv[1:], version=__version__)
try:
target_id, www_lang = www.parse_url(args["URL"])
http_session = requests.sessions.Session()
sources = fetch_sources(http_session, target_id, www_lang)
if not args["RENDITION"]:
print(f"Available renditions:")
_print_renditions(sources)
return 0
sources = _select_rendition(sources, args["RENDITION"])
if not args["VARIANT"]:
print(f"Available variants:")
_print_variants(sources)
return 0
sources = _select_variant(sources, args["VARIANT"])
file_names = [
naming.build_file_name(s, i, len(sources)) for i, s in enumerate(sources, 1)
]
progress = create_progress()
for source, file_name in zip(sources, file_names):
download_source(http_session, source, file_name, progress)
except error.UnexpectedError as e:
print(str(e))
print()
print(
"This program is the result of browser/server traffic analysis and involves\n"
"some level of trying and guessing. This error might mean that we did not try\n"
"enough or that we guessed poorly."
)
print("")
print("Please consider submitting the issue to us so we may fix it.")
print("")
print("Issue tracker: https://git.afpy.org/fcode/delarte/issues")
print(f"Title: {e.args[0]}")
print("Body:")
print(f" {repr(e)}")
return 1
except error.ModuleError as e:
print(str(e))
if args["--debug"]:
print(repr(e))
return 1
except requests.HTTPError as e:
print("Network error.")
if args["--debug"]:
print(str(e))
return 1
if __name__ == "__main__":
sys.exit(main())