Source code for visionsim.cli.ffmpeg

from __future__ import annotations

from pathlib import Path
from typing import cast


[docs] def animate( input_dir: Path, pattern: str | None = None, outfile: Path = Path("out.mp4"), fps: int = 25, crf: int = 22, vcodec: str = "libx264", step: int = 1, multiple: int | None = None, force: bool = False, bg_color: str = "black", strip_alpha: bool = False, ) -> None: """Combine generated frames into an MP4 using ffmpeg wizardry. This is roughly equivalent to running the "image2" demuxer in ffmpeg, with the added benefit of being able to skip frames using a step size, strip alpha channels from PNGs, and automatically handling the case where the input frames are numpy arrays. Args: input_dir: directory in which to look for frames, pattern: If provided search for files matching this pattern. Otherwise, look for a valid dataset in the input directory. outfile: where to save generated mp4 fps: frames per second in video crf: constant rate factor for video encoding (0-51), lower is better quality but more memory vcodec: video codec to use (either libx264 or libx265) step: drop some frames when making video, use frames 0+step*n multiple: some codecs require size to be a multiple of n force: if true, overwrite output file if present bg_color: for images with transparencies, namely PNGs, use this color as a background strip_alpha: if true, do not pre-process PNGs to remove transparencies """ import tempfile # Lazy import import imageio.v3 as iio import numpy as np from rich.progress import track from visionsim.cli import _run from visionsim.dataset import Dataset if _run("ffmpeg -version", hide=True).returncode != 0: raise RuntimeError("No ffmpeg installation found on path!") if not force and outfile.exists(): raise FileExistsError(f"Output file {outfile} already exists. Use `force` to overwrite.") outfile.parent.mkdir(parents=True, exist_ok=True) if pattern: dataset = Dataset.from_pattern(input_dir, pattern) else: dataset = Dataset.from_path(input_dir) exts = {p.suffix for p in dataset.paths} if len(exts) != 1: raise ValueError(f"Input directory must contain files of a single extension. Found: {exts}") ext = next(iter(exts)) # See: https://stackoverflow.com/questions/52804749 strip_alpha_filter = ( ( f'-filter_complex "color={bg_color},format=rgb24[c];[c][0]scale2ref[c][i];' f'[c][i]overlay=format=auto:shortest=1,setsar=1" ' ) if ext.lower() == ".png" and strip_alpha else "" ) with tempfile.TemporaryDirectory() as tmpdir: # There's no easy way to select out a subset of frames to use. # The select filter (-vf "select=not(mod(n\,step))") interferes with # the PNG alpha channel removal, and the concat muxer doesn't work # with images or leads to errors. # As a quick fix, we create a tmpdir with symlinks to the frames we # want to include and point ffmpeg to those. tmpdirname = Path(tmpdir) # Iterate over dataset with step and extract frame from npy if neccesary if ext.lower() == ".npy": for i, idx in enumerate(track(range(0, len(dataset), step), description="Extracting frames")): data, transform = dataset[idx] if cast(dict, transform).get("bitpack_dim"): data = np.array(data * 255).astype(np.uint8) iio.imwrite(tmpdirname / f"{i:09}.png", data) else: for i, p in enumerate(dataset.paths[::step]): (tmpdirname / f"{i:09}{ext}").symlink_to(p, target_is_directory=False) cmd = ( f"ffmpeg -framerate {fps} -f image2 -i {tmpdirname / ('%09d' + (ext if ext.lower() != '.npy' else '.png'))} {strip_alpha_filter}" f"{'-y' if force else ''} -vcodec {vcodec} -crf {crf} -pix_fmt yuv420p " ) if multiple: cmd += f"-vf scale=-{multiple}:2048 " cmd += f"{outfile} " _run(cmd)
[docs] def combine( matrix: str, outfile: Path = Path("combined.mp4"), mode: str = "shortest", color: str = "white", multiple: int = 2, force: bool = False, ) -> None: """Combine multiple videos into one by stacking, padding and resizing them using ffmpeg. Internally this task will first optionally pad all videos to length using ffmpeg's ``tpad`` filter, then ``scale`` all videos in a row to have the same height, combine rows together using the ``hstack`` filter before finally ``scale``\\ing row-videos to have same width and ``vstack``\\ing them together. Args: matrix: Way to specify videos to combine as a 2D matrix of file paths outfile: where to save generated mp4 mode: if 'shortest' combined video will last as long s shortest input video. If 'static', the last frame of videos that are shorter than the longest input video will be repeated. If 'pad', all videos as padded with frames of ``color`` to last the same duration. color: color to pad videos with, only used if mode is 'pad' multiple: some codecs require size to be a multiple of n force: if true, overwrite output file if present Example: The input videos can also be specified in a 2D array using the ``--matrix`` argument like so: .. code-block:: bash $ visionsim ffmpeg.combine --matrix='[["a.mp4", "b.mp4"]]' --outfile="output.mp4" """ # TODO: Allow borders and use xstack for better performance # See: https://stackoverflow.com/questions/11552565/vertically-or-horizontally-stack-mosaic-several-videos-using-ffmpeg/33764934#33764934 import ast import shutil import tempfile import numpy as np import numpy.typing as npt from visionsim.cli import _log, _run if outfile.is_file() and not force: raise RuntimeError("Output file already exists, either specify different output path or `--force` to override.") if _run("ffmpeg -version", hide=True).returncode != 0: raise RuntimeError("No ffmpeg installation found on path!") matrix = ast.literal_eval(matrix) if isinstance(matrix, str) else matrix flat_mat = [Path(path) for row in matrix for path in row] try: if any(not Path(p).is_file() for p in flat_mat): raise FileNotFoundError( "Expected video matrix to contain valid file paths or newline " "delimiters such as '\\n'/'\\r' or 'newline'/'enter'" ) except TypeError: raise RuntimeError("Expected video matrix to be 2D.") if mode.lower() not in ("shortest", "static", "pad"): raise ValueError(f"Expected `mode` to be one of 'shortest', 'static', 'pad' but got {mode}.") with tempfile.TemporaryDirectory() as tmpdir: # Keep track of new names of mp4s mapping: dict[str, Path] = {} row_paths: list[Path] = [] # Keep track of all original dimensions sizes = {str(path): dimensions(path) for path in flat_mat} # Find longest video and pad all to this length if mode.lower() == "pad": max_duration = max(duration(path) for path in flat_mat) for path in flat_mat: _log.info(f"Padding {path}...") out_path = Path(tmpdir) / Path(path).name out_path = out_path.with_name(f"{out_path.stem}_padded{out_path.suffix}") cmd = f"ffmpeg -i {path} -vf tpad=stop=-1=color={color},trim=end={max_duration} {out_path} -y" mapping[str(path)] = out_path _run(cmd) # If the matrix is not jagged, we can use ffmpeg's xstack instead if len(num_cols := set(len(row) for row in matrix)) == 1: in_paths = [mapping.get(p, p) for row in matrix for p in row] in_paths_str = "".join(f"-i {p} " for p in in_paths) filter_inputs_str = "".join( f"[{i}:v] setpts=PTS-STARTPTS, scale=qvga [a{i}]; " for i, _ in enumerate(in_paths) ) W, H = cast( tuple[npt.NDArray, ...], np.meshgrid( ["+".join(f"w{i}" for i in range(j)) or "0" for j in range(num_cols.pop())], ["+".join(f"h{i}" for i in range(j)) or "0" for j in range(len(matrix))], ), ) layout_spec = "|".join(f"{i}_{j}" for i, j in zip(W.flatten(), H.flatten())) placement = ( "".join(f"[a{i}]" for i, _ in enumerate(in_paths)) + f"xstack=inputs={len(in_paths)}:layout={layout_spec}[out]" ) cmd = f'ffmpeg {in_paths_str} -filter_complex "{filter_inputs_str} {placement}" -map "[out]" -c:v libx264 {outfile}' _run(cmd) return for i, row in enumerate(matrix): # Resize videos in each row max_height = max(sizes[path][1] for path in row) for p in row: if sizes[p][1] != max_height: _log.info(f"Resizing {p}...") in_path = mapping.get(p, p) out_path = Path(tmpdir) / Path(p).name out_path = out_path.with_name(f"{out_path.stem}_height_resize{out_path.suffix}") _run(f"ffmpeg -i {in_path} -vf scale=-{multiple}:{max_height} {out_path} -y") mapping[p] = out_path # Combine all videos in the row if len(row) >= 2: _log.info("Stacking rows...") paths = " -i ".join(str(mapping.get(p, p)) for p in row) out_file = Path(tmpdir) / f"row_{i:04}.mp4" row_paths.append(out_file) cmd = ( f"ffmpeg -i {paths} -filter_complex " f"hstack=inputs={len(row)}:shortest={int(mode.lower() == 'shortest')} " f"{out_file} -vsync vfr -y" ) _run(cmd) else: row_paths.append(mapping.get(row[0], Path(row[0]))) # Combine all rows if len(matrix) >= 2: # Resize row videos if needed row_sizes: dict[Path, tuple] = {path: dimensions(path) for path in row_paths} max_width: int = max(row_sizes[path][0] for path in row_paths) new_row_paths = [] for path in row_paths: if row_sizes[path][0] != max_width: _log.info(f"Resizing {path}...") out_path = Path(tmpdir) / Path(path).name out_path = out_path.with_name(f"{out_path.stem}_width_resize{out_path.suffix}") _run(f"ffmpeg -i {path} -vf scale={max_width}:-{multiple} {out_path} -y") new_row_paths.append(out_path) else: new_row_paths.append(Path(path)) # Join all row videos paths = " -i ".join(str(p) for p in new_row_paths) cmd = ( f"ffmpeg -i {paths} -filter_complex " f"vstack=inputs={len(matrix)}:shortest={int(mode.lower() == 'shortest')} " f"{outfile} -vsync vfr -y" ) _run(cmd) else: # We already created the video, simply move/rename it to output file shutil.move(row_paths[0], outfile)
[docs] def grid( input_dir: Path, width: int = -1, height: int = -1, pattern: str = "*.mp4", outfile: Path = Path("combined.mp4"), force: bool = False, ) -> None: """Make a mosaic from videos in a folder, organizing them in a grid Args: input_dir: directory containing all video files (mp4's expected), width: width of video grid to produce height: height of video grid to produce pattern: use files that match this pattern as inputs outfile: where to save generated mp4 force: if true, overwrite output file if present """ import numpy as np from natsort import natsorted files = natsorted(input_dir.glob(pattern)) if width <= 0 and height <= 0: candidates = [ (w, int(len(files) / w)) for w in range(1, len(files) + 1) if int(len(files) / w) == (len(files) / w) ] print("Please select size (width x height):") for i, candidate in enumerate(candidates): print(f"{i}) {candidate}") selection = int(input("> ")) width, height = candidates[selection] elif width <= 0: width = len(files) // height elif height <= 0: height = len(files) // width if int(width) != width or int(height) != height: raise ValueError(f"Width and height should be integers, instead got {width}, {height}.") else: width, height = int(width), int(height) matrix = np.array([str(p) for p in files]).reshape((height, width)).tolist() combine(str(matrix), outfile, force=force)
[docs] def count_frames(input_file: Path, /) -> int: """Count the number of frames a video file contains using ffprobe Args: input_file: video file input Returns: int: Number of frames in video. """ from visionsim.cli import _log, _run # See: https://stackoverflow.com/questions/2017843 if _run("ffprobe -version", hide=True).returncode != 0: raise RuntimeError("No ffprobe installation found on path!") cmd = ( f"ffprobe -v error -select_streams v:0 -count_packets -show_entries " f"stream=nb_read_packets -of csv=p=0 {input_file}" ) result = int(_run(cmd).stdout.strip()) _log.info(f"Video contains {result} frames.") return result
[docs] def duration(input_file: Path, /) -> float: """Return duration (in seconds) of first video stream in file using ffprobe Args: input_file: video file input Returns: float: Video duration in seconds. """ from visionsim.cli import _log, _run # See: http://trac.ffmpeg.org/wiki/FFprobeTips#Duration if _run("ffprobe -version", hide=True).returncode != 0: raise RuntimeError("No ffprobe installation found on path!") cmd = ( f"ffprobe -v error -select_streams v:0 -show_entries stream=duration " f"-of default=noprint_wrappers=1:nokey=1 {input_file}" ) result = float(_run(cmd).stdout.strip()) _log.info(f"Video lasts {result} seconds.") return result
[docs] def dimensions(input_file: Path) -> tuple[int, int]: """Return size (WxH in pixels) of first video stream in file using ffprobe Args: input_file: video file input Returns: tuple[int, int]: Video size as a (width, height) tuple. """ from visionsim.cli import _log, _run # See: http://trac.ffmpeg.org/wiki/FFprobeTips#Duration if _run("ffprobe -version", hide=True).returncode != 0: raise RuntimeError("No ffprobe installation found on path!") cmd = f"ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=s=x:p=0 {input_file}" result = _run(cmd).stdout.strip() _log.info(f"Video has size {result}.") return cast(tuple[int, int], tuple(int(dim) for dim in result.split("x")))
[docs] def extract(input_file: Path, output_dir: Path, pattern: str = "frames_%06d.png") -> None: """Extract frames from video file Args: input_file: path to video file from which to extract frames, output_dir: directory in which to save extracted frames, pattern: filenames of frames will match this pattern """ from visionsim.cli import _run if _run("ffmpeg -version", hide=True).returncode != 0: raise RuntimeError("No ffmpeg installation found on path!") if not input_file.is_file(): raise FileNotFoundError(f"File {input_file} not found.") output_dir.mkdir(parents=True, exist_ok=True) _run(f"ffmpeg -i {input_file} {output_dir / pattern}")