yt_common: chapters and audio mixins
This commit is contained in:
parent
80a0580608
commit
ffcdf94fc4
@ -1 +1 @@
|
|||||||
from . import antialiasing, automation, config, data, deband, logging, source # noqa: F401
|
from . import antialiasing, automation, chapters, config, data, deband, logging, scale, source # noqa: F401
|
||||||
|
@ -10,8 +10,9 @@ import subprocess
|
|||||||
|
|
||||||
from lvsfunc.render import clip_async_render
|
from lvsfunc.render import clip_async_render
|
||||||
|
|
||||||
from typing import Any, BinaryIO, Callable, List, Optional, Sequence, Tuple, cast
|
from typing import Any, BinaryIO, Callable, List, Optional, Sequence, Set, Tuple, cast
|
||||||
|
|
||||||
|
from .chapters import Chapter, make_chapters
|
||||||
from .config import Config
|
from .config import Config
|
||||||
from .logging import log
|
from .logging import log
|
||||||
from .source import FileSource
|
from .source import FileSource
|
||||||
@ -115,45 +116,33 @@ class Encoder():
|
|||||||
class AudioGetter():
|
class AudioGetter():
|
||||||
config: Config
|
config: Config
|
||||||
src: FileSource
|
src: FileSource
|
||||||
cleanup: List[str]
|
cleanup: Set[str]
|
||||||
|
|
||||||
def __init__(self, config: Config, src: FileSource) -> None:
|
def __init__(self, config: Config, src: FileSource) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.src = src
|
self.src = src
|
||||||
self.cleanup = []
|
self.cleanup = set()
|
||||||
|
|
||||||
def trim_audio(self, ftrim: Optional[acsuite.types.Trim] = None) -> str:
|
def trim_audio(self, ftrim: Optional[acsuite.types.Trim] = None) -> str:
|
||||||
trims = self.src.get_audio()
|
trims = self.src.get_audio()
|
||||||
if not trims or len(trims) > 1:
|
if not trims or len(trims) > 1:
|
||||||
raise NotImplementedError("Please implement multifile trimming")
|
raise NotImplementedError("Please implement multifile trimming")
|
||||||
audio_cut = acsuite.eztrim(trims[0].path, trims[0].trim or (0, None), streams=0)[0]
|
audio_cut = acsuite.eztrim(trims[0].path, trims[0].trim or (0, None), streams=0)[0]
|
||||||
self.cleanup.append(audio_cut)
|
self.cleanup.add(audio_cut)
|
||||||
|
|
||||||
if ftrim:
|
if ftrim:
|
||||||
audio_cut = acsuite.eztrim(audio_cut, ftrim, ref_clip=self.src.source())[0]
|
audio_cut = acsuite.eztrim(audio_cut, ftrim, ref_clip=self.src.source())[0]
|
||||||
self.cleanup.append(audio_cut)
|
self.cleanup.add(audio_cut)
|
||||||
|
|
||||||
|
audio_cut = self.config.encode_audio(audio_cut)
|
||||||
|
self.cleanup.add(audio_cut)
|
||||||
|
|
||||||
return audio_cut
|
return audio_cut
|
||||||
|
|
||||||
def encode_audio(self, path: str, codec_args: List[str]) -> str:
|
|
||||||
ffmpeg_args = [
|
|
||||||
"ffmpeg",
|
|
||||||
"-hide_banner", "-loglevel", "panic",
|
|
||||||
"-i", path,
|
|
||||||
"-y",
|
|
||||||
"-map", "0:a",
|
|
||||||
] + codec_args + [AUDIO_ENCODE]
|
|
||||||
print("+ " + " ".join(ffmpeg_args))
|
|
||||||
subprocess.call(ffmpeg_args)
|
|
||||||
|
|
||||||
self.cleanup.append(AUDIO_ENCODE)
|
|
||||||
|
|
||||||
return AUDIO_ENCODE
|
|
||||||
|
|
||||||
def do_cleanup(self) -> None:
|
def do_cleanup(self) -> None:
|
||||||
for f in self.cleanup:
|
for f in self.cleanup:
|
||||||
os.remove(f)
|
os.remove(f)
|
||||||
self.cleanup = []
|
self.cleanup.clear()
|
||||||
|
|
||||||
|
|
||||||
class SelfRunner():
|
class SelfRunner():
|
||||||
@ -175,7 +164,7 @@ class SelfRunner():
|
|||||||
|
|
||||||
def __init__(self, config: Config, source: FileSource, final_filter: Callable[[], vs.VideoNode],
|
def __init__(self, config: Config, source: FileSource, final_filter: Callable[[], vs.VideoNode],
|
||||||
workraw_filter: Optional[Callable[[], vs.VideoNode]] = None,
|
workraw_filter: Optional[Callable[[], vs.VideoNode]] = None,
|
||||||
audio_codec: Optional[List[str]] = None) -> None:
|
chapters: Optional[List[Chapter]] = None) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.src = source
|
self.src = source
|
||||||
self.video_clean = False
|
self.video_clean = False
|
||||||
@ -230,11 +219,11 @@ class SelfRunner():
|
|||||||
if start >= end:
|
if start >= end:
|
||||||
raise ValueError("Start frame must be before end frame!")
|
raise ValueError("Start frame must be before end frame!")
|
||||||
|
|
||||||
out_name = f"{self.config.title.lower()}_{self.config.desc}_{self.suffix}"
|
out_name = f"{self.config.title.lower().replace(' ', '_')}_{self.config.desc}_{self.suffix}"
|
||||||
|
|
||||||
if args.audio_only:
|
if args.audio_only:
|
||||||
out_name += ".mka"
|
out_name += ".mka"
|
||||||
self._do_audio(start, audio_end, audio_codec, out_name=out_name)
|
self._do_audio(start, audio_end, out_name=out_name)
|
||||||
self.audio.do_cleanup()
|
self.audio.do_cleanup()
|
||||||
log.success("--- AUDIO ENCODE COMPLETE ---")
|
log.success("--- AUDIO ENCODE COMPLETE ---")
|
||||||
return
|
return
|
||||||
@ -269,7 +258,11 @@ class SelfRunner():
|
|||||||
self.timecodes = [round(float(1e9*f*(1/self.clip.fps)))/1e9 for f in range(0, self.clip.num_frames + 1)] \
|
self.timecodes = [round(float(1e9*f*(1/self.clip.fps)))/1e9 for f in range(0, self.clip.num_frames + 1)] \
|
||||||
if self.clip.fps_den != 0 and len(self.timecodes) == 0 else self.timecodes
|
if self.clip.fps_den != 0 and len(self.timecodes) == 0 else self.timecodes
|
||||||
|
|
||||||
self._do_audio(start, audio_end, audio_codec)
|
self._do_audio(start, audio_end)
|
||||||
|
|
||||||
|
if chapters:
|
||||||
|
log.status("--- GENERATING CHAPTERS ---")
|
||||||
|
make_chapters(chapters, self.timecodes, f"{self.config.desc}.xml")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.status("--- MUXING FILE ---")
|
log.status("--- MUXING FILE ---")
|
||||||
@ -289,15 +282,12 @@ class SelfRunner():
|
|||||||
|
|
||||||
log.success("--- ENCODE COMPLETE ---")
|
log.success("--- ENCODE COMPLETE ---")
|
||||||
|
|
||||||
def _do_audio(self, start: int, end: int, codec: Optional[List[str]], out_name: Optional[str] = None) -> None:
|
def _do_audio(self, start: int, end: int, out_name: Optional[str] = None) -> None:
|
||||||
log.status("--- LOOKING FOR AUDIO ---")
|
log.status("--- LOOKING FOR AUDIO ---")
|
||||||
self.audio = AudioGetter(self.config, self.src)
|
self.audio = AudioGetter(self.config, self.src)
|
||||||
|
|
||||||
log.status("--- TRIMMING AUDIO ---")
|
log.status("--- TRIMMING AUDIO ---")
|
||||||
self.audio_file = self.audio.trim_audio((start, end))
|
self.audio_file = self.audio.trim_audio((start, end))
|
||||||
if codec:
|
|
||||||
log.status("--- TRANSCODING AUDIO ---")
|
|
||||||
self.audio_file = self.audio.encode_audio(self.audio_file, codec)
|
|
||||||
if out_name:
|
if out_name:
|
||||||
shutil.copy(self.audio_file, out_name)
|
shutil.copy(self.audio_file, out_name)
|
||||||
self.audio_file = out_name
|
self.audio_file = out_name
|
||||||
|
63
yt_common/yt_common/chapters.py
Normal file
63
yt_common/yt_common/chapters.py
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
from lxml import etree
|
||||||
|
from random import getrandbits
|
||||||
|
|
||||||
|
from typing import Dict, List, NamedTuple, Set
|
||||||
|
|
||||||
|
LANGMAP: Dict[str, str] = {
|
||||||
|
"eng": "en",
|
||||||
|
"und": "und",
|
||||||
|
"jpn": "ja",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class Chapter(NamedTuple):
|
||||||
|
title: str
|
||||||
|
frame: int
|
||||||
|
lang: str = "eng"
|
||||||
|
|
||||||
|
|
||||||
|
class RandMan:
|
||||||
|
used: Set[int]
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.used = set()
|
||||||
|
|
||||||
|
def get_rand(self, bits: int = 64) -> str:
|
||||||
|
r = getrandbits(bits)
|
||||||
|
while r in self.used:
|
||||||
|
r = getrandbits(bits)
|
||||||
|
self.used.add(r)
|
||||||
|
return str(r)
|
||||||
|
|
||||||
|
|
||||||
|
def timecode_to_timestamp(stamp: float) -> str:
|
||||||
|
m = int(stamp // 60)
|
||||||
|
stamp %= 60
|
||||||
|
h = int(m // 60)
|
||||||
|
m %= 60
|
||||||
|
return f"{h:02d}:{m:02d}:{stamp:06.3f}000000"
|
||||||
|
|
||||||
|
|
||||||
|
def make_chapters(chapters: List[Chapter], timecodes: List[float], outfile: str) -> None:
|
||||||
|
chapters.sort(key=lambda c: c.frame)
|
||||||
|
rand = RandMan()
|
||||||
|
|
||||||
|
root = etree.Element("Chapters")
|
||||||
|
ed = etree.SubElement(root, "EditionEntry")
|
||||||
|
etree.SubElement(ed, "EditionUID").text = rand.get_rand()
|
||||||
|
|
||||||
|
for i, c in enumerate(chapters):
|
||||||
|
start = timecode_to_timestamp(timecodes[c.frame])
|
||||||
|
end = timecode_to_timestamp(timecodes[chapters[i+1].frame]) if i < len(chapters) - 1 else None
|
||||||
|
atom = etree.SubElement(ed, "ChapterAtom")
|
||||||
|
etree.SubElement(atom, "ChapterTimeStart").text = start
|
||||||
|
if end is not None:
|
||||||
|
etree.SubElement(atom, "ChapterTimeEnd").text = end
|
||||||
|
disp = etree.SubElement(atom, "ChapterDisplay")
|
||||||
|
etree.SubElement(disp, "ChapterString").text = c.title
|
||||||
|
etree.SubElement(disp, "ChapLanguageIETF").text = LANGMAP[c.lang]
|
||||||
|
etree.SubElement(disp, "ChapterLanguage").text = c.lang
|
||||||
|
etree.SubElement(atom, "ChapterUID").text = rand.get_rand()
|
||||||
|
|
||||||
|
with open(outfile, "wb") as o:
|
||||||
|
o.write(etree.tostring(root, encoding="utf-8", xml_declaration=True, pretty_print=True))
|
@ -1,4 +1,8 @@
|
|||||||
from typing import Union
|
import os
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from typing import List, Union
|
||||||
|
|
||||||
|
|
||||||
class Config():
|
class Config():
|
||||||
@ -16,5 +20,48 @@ class Config():
|
|||||||
self.datapath = datapath
|
self.datapath = datapath
|
||||||
|
|
||||||
def format_filename(self, filename: str) -> str:
|
def format_filename(self, filename: str) -> str:
|
||||||
return filename.format(epnum=self.desc, title=self.title,
|
fname = filename.format(epnum=self.desc, title=self.title,
|
||||||
title_long=self.title_long, resolution=self.resolution)
|
title_long=self.title_long, resolution=self.resolution)
|
||||||
|
return os.path.join(f"../{self.desc}/", fname)
|
||||||
|
|
||||||
|
def encode_audio(self, afile: str) -> str:
|
||||||
|
return afile # default: passthrough
|
||||||
|
|
||||||
|
|
||||||
|
class AudioEncoder(ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def encode_audio(self, afile: str) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class FFAudio(AudioEncoder):
|
||||||
|
def encode_audio(self, afile: str) -> str:
|
||||||
|
ffmpeg_args = [
|
||||||
|
"ffmpeg",
|
||||||
|
"-hide_banner", "-loglevel", "panic",
|
||||||
|
"-i", afile,
|
||||||
|
"-y",
|
||||||
|
"-map", "0:a",
|
||||||
|
] + self.codec_args() + ["_ffaudio_encode.mka"]
|
||||||
|
print("+ " + " ".join(ffmpeg_args))
|
||||||
|
subprocess.call(ffmpeg_args)
|
||||||
|
return "_ffaudio_encode.mka"
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def codec_args(self) -> List[str]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class OpusMixin(FFAudio):
|
||||||
|
def codec_args(self) -> List[str]:
|
||||||
|
return ["-c:a", "libopus", "-b:a", "192k", "-sample_fmt", "s16"]
|
||||||
|
|
||||||
|
|
||||||
|
class FdkAacMixin(FFAudio):
|
||||||
|
def codec_args(self) -> List[str]:
|
||||||
|
return ["-c:a", "libfdk_aac", "-b:a", "256k", "-sample_fmt", "s16"]
|
||||||
|
|
||||||
|
|
||||||
|
class FlacMixin(FFAudio):
|
||||||
|
def codec_args(self) -> List[str]:
|
||||||
|
return ["-c:a", "flac"]
|
||||||
|
@ -78,8 +78,9 @@ class FileSource(ABC):
|
|||||||
class SimpleSource(FileSource):
|
class SimpleSource(FileSource):
|
||||||
src: List[FileTrim]
|
src: List[FileTrim]
|
||||||
|
|
||||||
def __init__(self, src: Union[FileTrim, List[FileTrim]]) -> None:
|
def __init__(self, src: Union[str, List[str], FileTrim, List[FileTrim]]) -> None:
|
||||||
self.src = src if isinstance(src, list) else [src]
|
srcl = src if isinstance(src, list) else [src]
|
||||||
|
self.src = [FileTrim(s, (None, None)) if isinstance(s, str) else s for s in srcl]
|
||||||
|
|
||||||
def get_audio(self) -> List[FileTrim]:
|
def get_audio(self) -> List[FileTrim]:
|
||||||
return self.src
|
return self.src
|
||||||
|
Loading…
x
Reference in New Issue
Block a user