delarte_test/src/delarte/__main__.py
Barbagus be4363a339 Fix issue #6 on FFMPEG header error
Handle the audio and video channel downloading to temporary files prior
to calling ffmpeg.

Although it might not be necessary, the download is made by "chunks" as
it would be by a client/player.

Downloading progress feedback is printed to the terminal.
2022-12-11 16:09:11 +01:00

137 lines
3.8 KiB
Python

"""ArteTV dowloader.
usage: delarte [-h|--help] - print this message
or: delarte program_page_url - show available versions
or: delarte program_page_url version - show available resolutions
or: delarte program_page_url version resolution - download the given video
"""
import subprocess
import sys
import time
from urllib.parse import urlparse
from . import (
build_ffmpeg_cmd,
build_file_base_name,
download_inputs,
select_resolution,
select_version,
iter_resolutions,
iter_versions,
load_config_api,
load_version_index,
)
def fail(message, code=1):
"""Print a message to STDERR and return a given exit code."""
print(message, file=sys.stderr)
return code
def print_available_versions(config, f):
"""Print available program versions."""
print(f"Available versions:", file=f)
for code, label in iter_versions(config):
print(f"\t{code} - {label}", file=f)
def print_available_resolutions(version_index, f):
"""Print available version resolutions."""
print(f"Available resolutions:", file=f)
for code, label in iter_resolutions(version_index):
print(f"\t{code} - {label}", file=f)
def create_progress():
"""Create a progress handler for input downloads."""
state = {
"last_update_time": 0,
"last_channel": None,
}
def progress(channel, current, total):
now = time.time()
if current == total:
print(f"\rDownloading {channel}: 100.0%")
state["last_update_time"] = now
elif channel != state["last_channel"]:
print(f"Dowloading {channel}: 0.0%", end="")
state["last_update_time"] = now
state["last_channel"] = channel
elif now - state["last_update_time"] > 1:
print(
f"\rDownloading {channel}: {int(1000.0 * current / total) / 10.0}%",
end="",
)
state["last_update_time"] = now
return progress
def main():
"""CLI command."""
args = sys.argv[1:]
if not args or args[0] == "-h" or args[0] == "--help":
print(__doc__)
return 0
try:
program_page_url = urlparse(args.pop(0))
if program_page_url.hostname != "www.arte.tv":
return fail("Not an ArteTV url")
program_page_path = program_page_url.path.split("/")[1:]
ui_language = program_page_path.pop(0)
if ui_language not in ("fr", "de", "en", "es", "pl", "it"):
return fail(f"Invalid url language code: {ui_language}")
if program_page_path.pop(0) != "videos":
return fail("Invalid ArteTV url")
program_id = program_page_path.pop(0)
except ValueError:
return fail("Invalid url")
try:
config = load_config_api(ui_language, program_id)
except ValueError:
return fail("Invalid program")
if not args:
print_available_versions(config, sys.stdout)
return 0
version_index_url = select_version(config, args.pop(0))
if version_index_url is None:
fail("Invalid version")
print_available_versions(config, sys.stderr)
return 1
version_index = load_version_index(version_index_url)
if not args:
print_available_resolutions(version_index, sys.stdout)
return 0
remote_inputs = select_resolution(version_index, args.pop(0))
if remote_inputs is None:
fail("Invalid resolution")
print_available_resolutions(version_index, sys.stderr)
return 0
file_base_name = build_file_base_name(config)
with download_inputs(remote_inputs, create_progress()) as temp_inputs:
args = build_ffmpeg_cmd(temp_inputs, file_base_name)
subprocess.run(args)
if __name__ == "__main__":
sys.exit(main())