205 lines
5.6 KiB
Python
205 lines
5.6 KiB
Python
# License: GNU AGPL v3: http://www.gnu.org/licenses/
|
|
# This file is part of `delarte` (https://git.afpy.org/fcode/delarte.git)
|
|
|
|
"""delarte - ArteTV downloader.
|
|
|
|
usage: delarte [-h|--help] - print this message
|
|
or: delarte program_page_url - show available versions
|
|
or: delarte program_page_url version - show available resolutions
|
|
or: delarte program_page_url version resolution - download the given video
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
|
|
import requests
|
|
|
|
from . import cli, download_source, error, fetch_sources, naming, www
|
|
|
|
|
|
class Abort(error.ModuleError):
|
|
"""Aborted."""
|
|
|
|
|
|
class Fail(error.UnexpectedError):
|
|
"""Unexpected error."""
|
|
|
|
|
|
def _lookup_language_name(code):
|
|
# TODO: actually implement this
|
|
return f"[{code}]"
|
|
|
|
|
|
def _build_rendition_label(rendition):
|
|
# Build a human readable description for a given rendition
|
|
label = ""
|
|
if rendition.audio.is_original:
|
|
label += "original "
|
|
elif not rendition.audio.is_accessible:
|
|
label += "dubbed "
|
|
|
|
label += _lookup_language_name(rendition.audio.lang)
|
|
|
|
if rendition.audio.is_accessible:
|
|
label += " audio description"
|
|
|
|
if rendition.subtitles:
|
|
|
|
if rendition.subtitles.is_accessible:
|
|
if rendition.subtitles.lang != rendition.audio.lang:
|
|
label += f" with {_lookup_language_name(rendition.subtitles.lang)} text description"
|
|
else:
|
|
label += " with text description"
|
|
elif rendition.subtitles.lang != rendition.audio.lang:
|
|
label += (
|
|
f" with {_lookup_language_name(rendition.subtitles.lang)} subtitles"
|
|
)
|
|
|
|
return label
|
|
|
|
|
|
def _print_renditions(sources):
|
|
items = [
|
|
(r.code, _build_rendition_label(r)) for r in set(s.rendition for s in sources)
|
|
]
|
|
|
|
items.sort(key=lambda t: t[1])
|
|
|
|
for code, label in items:
|
|
print(f"\t{code:>6} - {label}")
|
|
|
|
|
|
def _print_variants(sources):
|
|
items = [(v.code, v.height) for v in set(s.variant for s in sources)]
|
|
|
|
items.sort(key=lambda t: t[1], reverse=True)
|
|
|
|
for code, _ in items:
|
|
print(f"\t{code}")
|
|
|
|
|
|
def _select_rendition(sources, rendition_code):
|
|
filtered = [s for s in sources if s.rendition.code == rendition_code]
|
|
|
|
if not filtered:
|
|
print(f"{rendition_code!r} is not a valid version, possible values are:")
|
|
_print_renditions(sources)
|
|
raise Abort()
|
|
|
|
if len(_ := set(s.rendition for s in filtered)) > 1:
|
|
raise Fail("DUPLICATE_RENDITION_CODE", _)
|
|
|
|
return filtered
|
|
|
|
|
|
def _select_variant(sources, variant_code):
|
|
filtered = [s for s in sources if s.variant.code == variant_code]
|
|
|
|
if not filtered:
|
|
print(f"{variant_code!r} is not a valid resolution, possible values are:")
|
|
_print_variants(sources)
|
|
raise Abort()
|
|
|
|
if len(_ := set(s.variant for s in filtered)) > 1:
|
|
raise Fail("DUPLICATE_VARIANT_CODE", _)
|
|
|
|
return filtered
|
|
|
|
|
|
def create_progress():
|
|
"""Create a progress handler for input downloads."""
|
|
state = {
|
|
"last_update_time": 0,
|
|
"last_channel": None,
|
|
}
|
|
|
|
def progress(channel, current, total):
|
|
now = time.time()
|
|
|
|
if current == total:
|
|
print(f"\rDownloading {channel}: 100.0%")
|
|
state["last_update_time"] = now
|
|
elif channel != state["last_channel"]:
|
|
print(f"Downloading {channel}: 0.0%", end="")
|
|
state["last_update_time"] = now
|
|
state["last_channel"] = channel
|
|
elif now - state["last_update_time"] > 1:
|
|
print(
|
|
f"\rDownloading {channel}: {int(1000.0 * current / total) / 10.0}%",
|
|
end="",
|
|
)
|
|
state["last_update_time"] = now
|
|
|
|
return progress
|
|
|
|
|
|
def main():
|
|
"""CLI command."""
|
|
parser = cli.Parser()
|
|
args = parser.get_args_as_list()
|
|
|
|
if not args or args[0] == "-h" or args[0] == "--help":
|
|
print(__doc__)
|
|
return 0
|
|
|
|
try:
|
|
target_id, www_lang = www.parse_url(args.pop(0))
|
|
|
|
http_session = requests.sessions.Session()
|
|
|
|
sources = fetch_sources(http_session, target_id, www_lang)
|
|
|
|
if not args:
|
|
print(f"Available versions:")
|
|
_print_renditions(sources)
|
|
return 0
|
|
|
|
sources = _select_rendition(sources, args.pop(0))
|
|
|
|
if not args:
|
|
print(f"Available resolutions:")
|
|
_print_variants(sources)
|
|
return 0
|
|
|
|
sources = _select_variant(sources, args.pop(0))
|
|
|
|
file_names = [
|
|
naming.build_file_name(s, i, len(sources)) for i, s in enumerate(sources, 1)
|
|
]
|
|
|
|
progress = create_progress()
|
|
|
|
for source, file_name in zip(sources, file_names):
|
|
download_source(http_session, source, file_name, progress)
|
|
|
|
except error.UnexpectedError as e:
|
|
print(str(e))
|
|
print()
|
|
print(
|
|
"This program is the result of browser/server traffic analysis and involves\n"
|
|
"some level of trying and guessing. This error might mean that we did not try\n"
|
|
"enough or that we guessed poorly."
|
|
)
|
|
print("")
|
|
print("Please consider submitting the issue to us so we may fix it.")
|
|
print("")
|
|
print("Issue tracker: https://git.afpy.org/fcode/delarte/issues")
|
|
print(f"Title: {e.args[0]}")
|
|
print("Body:")
|
|
print(f" {repr(e)}")
|
|
return 1
|
|
|
|
except error.ModuleError as e:
|
|
print(str(e))
|
|
# print(repr(e))
|
|
return 1
|
|
|
|
except requests.HTTPError as e:
|
|
print("Network error.")
|
|
# print(str(e))
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|