#!/usr/bin/env python3 import subprocess import sys import os import time import parse from ansi import cursor from datetime import datetime __version__ = 1.1 class Chopper: def __init__(self, buf): self.buffer = buf self.memory = bytearray() self.eol1 = b"\r" self.eol2 = b"\n" self.leneol = len(self.eol1) def read(self): line = bytearray() while True: c = self.buffer.read(1) if c: line += c if line[-self.leneol :] == self.eol1: break if line[-self.leneol :] == self.eol2: break else: break return bytes(line).decode("utf-8") class Progress: def __init__(self): self.started = time.time() self.duration = None self.framedata = {} self.parsable = ( "frame", "fps", "bitrate", "total_size", "speed", "out_time_ms", ) self.inputs = [] self.inputs_full = [] self.input = "NA" self.input_size = "NA" self.outputs = [] self.output = "NA" def parse(self, line): if not self.parse_frame(line): print(line.rstrip()) self.parse_input(line) self.parse_output(line) self.parse_duration(line) self.print() def parse_input(self, line): # Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'VID_20210804_141045.mp4': parsed = parse.parse("Input #{:d}, {}, from '{}':", line.strip()) try: self.inputs.append(os.path.basename(parsed[2])) self.inputs_full.append(parsed[2]) self.input = ", ".join(self.inputs) self.input_size = ", ".join( [self._mbstr(os.stat(i).st_size) for i in self.inputs_full] ) except Exception: return def parse_output(self, line): parsed = parse.parse("Output #{:d}, {}, to '{}':", line.strip()) try: self.outputs.append(os.path.basename(parsed[2])) self.output = ", ".join(self.outputs) except Exception: return def parse_duration(self, line): if self.duration: return parsed = parse.parse( "Duration: {:d}:{:d}:{:d}.{:d}, start: {}, bitrate: {} kb/s", line.strip() ) # ~ Duration: 00:00:48.21, start: 0.000000, bitrate: 17780 kb/s try: self.duration = 3600 * parsed[0] + 60 * parsed[1] + parsed[2] except Exception: return def parse_frame(self, line): try: values = len(line.strip().split("=")) if values > 2: # and line.startswith("frame="): return True if values != 2: return False parsed = parse.parse("{}={}", line.strip()) if not parsed: return False if parsed[0] in self.parsable: if parsed[0] == "out_time_ms": self.framedata["out_time_s"] = float(parsed[1]) / 1000000 else: self.framedata[parsed[0]] = parsed[1] except Exception as e: print(e) return False return True def print(self): if len(self.framedata) == 0: return self.framedata["time_elapsed"] = int(time.time() - self.started) if "out_time_s" in self.framedata: try: self.framedata["percent_done"] = round( 100 * self.framedata["out_time_s"] / self.duration, 1 ) except TypeError: self.framedata["percent_done"] = "NA" try: if self.framedata["percent_done"] > 100: self.framedata["time_remaining"] = "NA" else: self.framedata["time_remaining"] = int( ( 100 * float(time.time() - self.started) / self.framedata["percent_done"] ) - self.framedata["time_elapsed"] ) except Exception: self.framedata["time_remaining"] = "NA" try: self.framedata["projected_size"] = int( ( 100 * float(self.framedata["total_size"]) / self.framedata["percent_done"] ) ) except Exception: self.framedata["time_remaining"] = "NA" else: self.framedata["percent_done"] = "NA" self.framedata["time_remaining"] = "NA" self.framedata["projected_size"] = "NA" try: msg = """{cl}==== Q to exit =============== {cl}Input: {input_file} {cl}Output: {output_file} {cl}Progress: {progress}% Elapsed: H{elapsed} {cl}Finished in: H{left} {cl}Frame: {frame} = {out_time} {cl}Source duration: {duration} {cl}Processing speed: FPS {fps} / {speed} {cl}Bitrate: {bitrate} {cl}File size: {total_size}Mb -> {projected_size}Mb (Input: {input_size}Mb) {cl}{progress_bar}\r{up}""".format( input_file=self.input, input_size=self.input_size, output_file=self.output, progress=self.framedata["percent_done"], progress_bar=self._progress_bar(self.framedata["percent_done"]), elapsed=self._timestr(self.framedata["time_elapsed"]), left=self._timestr(self.framedata["time_remaining"]), duration=self._timestr(self.duration), out_time=self._timestr(self.framedata["out_time_s"]), frame=self.framedata.get("frame", "NA"), fps=self.framedata.get("fps", "NA"), bitrate=self.framedata["bitrate"], speed=self.framedata["speed"], total_size=self._mbstr(self.framedata["total_size"]), projected_size=self._mbstr(self.framedata["projected_size"]), up=cursor.up(10), cl=cursor.erase_line(), ) sys.stdout.write(msg) sys.stdout.flush() except Exception as e: pass def finish(self): for i in range(len(self.framedata) + 3): sys.stdout.write("\n") sys.stdout.flush() def _timestr(self, sec): try: hours = int(sec) // 3600 % 24 minutes = int(sec) // 60 % 60 seconds = int(sec) % 60 return "{:02d}:{:02d}:{:02d}".format(hours, minutes, seconds) except Exception: return sec def _mbstr(self, b): try: return "{:.1f}".format((float(b) / (1024 ** 2))) except Exception: return b def _progress_bar(self, p): try: done_chars = int(float(p) * 30 / 100) todo_chars = int(30 - done_chars) return (">" * done_chars + "-" * todo_chars)[0:30] except Exception: return ">" * 30 def parse_output(args): return args[-1] def parse_overwrite(args): return "-y" in args def ask_for_overwrite(commands, path): print("File {} exists. Overwrite or break? ([y]/n)".format(path)) answer = input() if answer == "n": sys.exit(1) commands.insert(-1, "-y") return commands def main(): commands = sys.argv[1:] if len(commands) == 1: if commands[0] == "-h": print( "This command passes all arguments to FFMPEG, and parses output to readable format with progress" ) sys.exit(0) try: has_overwrite = parse_overwrite(commands) if not has_overwrite: output_file = parse_output(commands) output_exists = os.path.exists(output_file) if output_exists: commands = ask_for_overwrite(commands, output_file) process = subprocess.Popen( ["", "-progress", "pipe:2", "-hide_banner"] + commands, executable="ffmpeg", stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) chopper = Chopper(process.stdout) progress = Progress() started = time.time() duration = None while True: output = chopper.read() progress.parse(output) if not output: if process.poll() != None: break rc = process.poll() progress.finish() print("Exit code: {}".format(rc)) sys.exit(rc) except Exception as e: print(e) process.terminate() progress.finish() while process.poll() != None: time.sleep(1) raise (e) sys.exit(1) if __name__ == "__main__": main()