1) Parse and locally temp/rewrite the main m3u8 file to enable future selection

of resultion. Picks the bigger one for now.
2) Revrite subtitles selection to limit to selected resolution.
This commit is contained in:
Etienne Zind 2022-12-06 13:19:57 +01:00
parent 2b9d8773bd
commit af465ad79e
1 changed files with 56 additions and 41 deletions

View File

@ -11,11 +11,13 @@ This file is part of [`delarte`](https://git.afpy.org/fcode/delarte)
"""
import json
import sys
import re
import io
import json
import os
import re
import subprocess
import sys
import tempfile
from http import HTTPStatus
from os import environ
@ -84,63 +86,76 @@ def api_playlist(lang: str, provider_id: str):
raise NotImplementedError
def write_subtitles(m3u8_url, base_name):
def write_subtitles(lang, m3u8_uri, file_base_name):
"""Convert distant vtt subtitles to local srt."""
main = m3u8.load(m3u8_url)
sub_m3u8 = m3u8.load(m3u8_uri)
sub_urls = [cast(str, sub_m3u8.base_uri) + "/" + f for f in sub_m3u8.files]
sub_m3u8_urls = [
(m.base_uri + "/" + m.uri, m.language)
for m in main.media
if m.type == "SUBTITLES"
]
if not sub_urls:
raise ValueError("No subtitle files")
for sub_m3u8_url, sub_lang in sub_m3u8_urls:
if len(sub_urls) > 1:
raise ValueError("Multiple subtitle files")
sub_m3u8 = m3u8.load(sub_m3u8_url)
sub_urls = [cast(str, sub_m3u8.base_uri) + "/" + f for f in sub_m3u8.files]
http_response = urlopen(sub_urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
if not sub_urls:
raise ValueError("No subtitle files")
buffer = io.StringIO(http_response.read().decode("utf8"))
if len(sub_urls) > 1:
raise ValueError("Multiple subtitle files")
http_response = urlopen(sub_urls[0])
if http_response.status != HTTPStatus.OK:
raise RuntimeError("Subtitle request failed")
buffer = io.StringIO(http_response.read().decode("utf8"))
with open(f"{base_name}.{sub_lang}.srt", "w", encoding="utf8") as f:
for i, caption in enumerate(webvtt.read_buffer(buffer), 1):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
return f.name
with open(f"{file_base_name}.{lang}.srt", "w", encoding="utf8") as f:
for i, caption in enumerate(webvtt.read_buffer(buffer), 1):
print(i, file=f)
print(
re.sub(r"\.", ",", caption.start)
+ " --> "
+ re.sub(r"\.", ",", caption.end),
file=f,
)
print(caption.text + "\n", file=f)
return f.name
def download_stream(m3u8_url: str, base_file: str):
def download_stream(m3u8_url: str, file_base_name: str):
"""Download and writes the video and subtitles files."""
write_subtitles(m3u8_url, base_file)
dst = m3u8.M3U8()
src = m3u8.load(m3u8_url)
# sort streams by resolution (descending) and pick the bigger one
src.playlists.sort(key=lambda pl: pl.stream_info.resolution, reverse=True)
src.playlists[0].uri = src.base_uri + src.playlists[0].uri
dst.add_playlist(src.playlists[0])
for media in src.playlists[0].media:
media.uri = src.base_uri + media.uri
if media.type == "SUBTITLES":
write_subtitles(media.language, media.uri, file_base_name)
else:
dst.add_media(media)
with tempfile.NamedTemporaryFile(
"w", delete=False, encoding="utf8", prefix="delarte.", suffix=".m3u8"
) as f:
f.write(dst.dumps())
dst_path = f.name
subprocess.run(
[
FFMPEG,
"-protocol_whitelist",
"https,file,tls,tcp",
"-i",
m3u8_url,
dst_path,
"-c",
"copy",
"-bsf:a",
"aac_adtstoasc",
f"{base_file}.mp4",
f"{file_base_name}.mp4",
]
)
os.unlink(dst_path)
def main():
"""CLI function, options passed as arguments."""
@ -152,7 +167,7 @@ def main():
config = api_config(ui_lang, stream_id)
base_file = config.title.replace("/", "-")
file_base_name = config.title.replace("/", "-")
if version not in config.versions:
print(f"{config.title} - {config.subtitle}")
@ -162,7 +177,7 @@ def main():
m3u8_url, _version_name = config.versions[version]
download_stream(m3u8_url, base_file)
download_stream(m3u8_url, file_base_name)
if __name__ == "__main__":