ffmpeg utils as py module
This commit is contained in:
0
py-packages/ffmpeg-parser/ffmpegparser/__init__.py
Normal file
0
py-packages/ffmpeg-parser/ffmpegparser/__init__.py
Normal file
299
py-packages/ffmpeg-parser/ffmpegparser/ffmpegparser.py
Executable file
299
py-packages/ffmpeg-parser/ffmpegparser/ffmpegparser.py
Executable file
@@ -0,0 +1,299 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import parse
|
||||
from ansi import cursor
|
||||
from datetime import datetime
|
||||
|
||||
__version__=1.0
|
||||
|
||||
class Chopper:
|
||||
def __init__(self, buf):
|
||||
self.buffer = buf
|
||||
self.memory = bytearray()
|
||||
self.eol1 = b"\r"
|
||||
self.eol2 = b"\n"
|
||||
self.leneol = len(self.eol1)
|
||||
|
||||
def read(self):
|
||||
line = bytearray()
|
||||
while True:
|
||||
c = self.buffer.read(1)
|
||||
if c:
|
||||
line += c
|
||||
if line[-self.leneol :] == self.eol1:
|
||||
break
|
||||
if line[-self.leneol :] == self.eol2:
|
||||
break
|
||||
else:
|
||||
break
|
||||
return bytes(line).decode("utf-8")
|
||||
|
||||
|
||||
class Progress:
|
||||
def __init__(self):
|
||||
self.started = time.time()
|
||||
self.duration = None
|
||||
self.framedata = {}
|
||||
self.parsable = (
|
||||
"frame",
|
||||
"fps",
|
||||
"bitrate",
|
||||
"total_size",
|
||||
"speed",
|
||||
"out_time_ms",
|
||||
)
|
||||
self.inputs = []
|
||||
self.inputs_full = []
|
||||
self.input = "NA"
|
||||
self.input_size = "NA"
|
||||
self.outputs = []
|
||||
self.output = "NA"
|
||||
|
||||
def parse(self, line):
|
||||
|
||||
if not self.parse_frame(line):
|
||||
print(line.rstrip())
|
||||
self.parse_input(line)
|
||||
self.parse_output(line)
|
||||
self.parse_duration(line)
|
||||
|
||||
self.print()
|
||||
|
||||
def parse_input(self, line):
|
||||
|
||||
# Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'VID_20210804_141045.mp4':
|
||||
parsed = parse.parse("Input #{:d}, {}, from '{}':", line.strip())
|
||||
try:
|
||||
self.inputs.append(os.path.basename(parsed[2]))
|
||||
self.inputs_full.append(parsed[2])
|
||||
self.input = ", ".join(self.inputs)
|
||||
self.input_size = ", ".join(
|
||||
[self._mbstr(os.stat(i).st_size) for i in self.inputs_full]
|
||||
)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def parse_output(self, line):
|
||||
|
||||
parsed = parse.parse("Output #{:d}, {}, to '{}':", line.strip())
|
||||
try:
|
||||
self.outputs.append(os.path.basename(parsed[2]))
|
||||
self.output = ", ".join(self.outputs)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def parse_duration(self, line):
|
||||
|
||||
if self.duration:
|
||||
return
|
||||
parsed = parse.parse(
|
||||
"Duration: {:d}:{:d}:{:d}.{:d}, start: {}, bitrate: {} kb/s", line.strip()
|
||||
)
|
||||
# ~ Duration: 00:00:48.21, start: 0.000000, bitrate: 17780 kb/s
|
||||
try:
|
||||
self.duration = 3600 * parsed[0] + 60 * parsed[1] + parsed[2]
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def parse_frame(self, line):
|
||||
|
||||
try:
|
||||
values = len(line.strip().split("="))
|
||||
if values > 2: # and line.startswith("frame="):
|
||||
return True
|
||||
if values != 2:
|
||||
return False
|
||||
parsed = parse.parse("{}={}", line.strip())
|
||||
if not parsed:
|
||||
return False
|
||||
if parsed[0] in self.parsable:
|
||||
if parsed[0] == "out_time_ms":
|
||||
self.framedata["out_time_s"] = float(parsed[1]) / 1000000
|
||||
else:
|
||||
self.framedata[parsed[0]] = parsed[1]
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def print(self):
|
||||
|
||||
if len(self.framedata) == 0:
|
||||
return
|
||||
|
||||
self.framedata["time_elapsed"] = int(time.time() - self.started)
|
||||
if "out_time_s" in self.framedata:
|
||||
self.framedata["percent_done"] = round(
|
||||
100 * self.framedata["out_time_s"] / self.duration, 1
|
||||
)
|
||||
try:
|
||||
if self.framedata["percent_done"] > 100:
|
||||
self.framedata["time_remaining"] = "NA"
|
||||
else:
|
||||
self.framedata["time_remaining"] = int(
|
||||
(
|
||||
100
|
||||
* float(time.time() - self.started)
|
||||
/ self.framedata["percent_done"]
|
||||
)
|
||||
- self.framedata["time_elapsed"]
|
||||
)
|
||||
except Exception:
|
||||
self.framedata["time_remaining"] = "NA"
|
||||
try:
|
||||
self.framedata["projected_size"] = int(
|
||||
(
|
||||
100
|
||||
* float(self.framedata["total_size"])
|
||||
/ self.framedata["percent_done"]
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
self.framedata["time_remaining"] = "NA"
|
||||
else:
|
||||
self.framedata["percent_done"] = "NA"
|
||||
self.framedata["time_remaining"] = "NA"
|
||||
self.framedata["projected_size"] = "NA"
|
||||
|
||||
try:
|
||||
msg = """{cl}==== Q to exit ===============
|
||||
{cl}Input: {input_file}
|
||||
{cl}Output: {output_file}
|
||||
{cl}Progress: {progress}% Elapsed: H{elapsed}
|
||||
{cl}Finished in: H{left}
|
||||
{cl}Frame: {frame} = {out_time}
|
||||
{cl}Source duration: {duration}
|
||||
{cl}Processing speed: FPS {fps} / {speed}
|
||||
{cl}Bitrate: {bitrate}
|
||||
{cl}File size: {total_size}Mb -> {projected_size}Mb (Input: {input_size}Mb)
|
||||
{cl}{progress_bar}\r{up}""".format(
|
||||
input_file=self.input,
|
||||
input_size=self.input_size,
|
||||
output_file=self.output,
|
||||
progress=self.framedata["percent_done"],
|
||||
progress_bar=self._progress_bar(self.framedata["percent_done"]),
|
||||
elapsed=self._timestr(self.framedata["time_elapsed"]),
|
||||
left=self._timestr(self.framedata["time_remaining"]),
|
||||
duration=self._timestr(self.duration),
|
||||
out_time=self._timestr(self.framedata["out_time_s"]),
|
||||
frame=self.framedata.get("frame", "NA"),
|
||||
fps=self.framedata.get("fps", "NA"),
|
||||
bitrate=self.framedata["bitrate"],
|
||||
speed=self.framedata["speed"],
|
||||
total_size=self._mbstr(self.framedata["total_size"]),
|
||||
projected_size=self._mbstr(self.framedata["projected_size"]),
|
||||
up=cursor.up(10),
|
||||
cl=cursor.erase_line(),
|
||||
)
|
||||
|
||||
sys.stdout.write(msg)
|
||||
sys.stdout.flush()
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
def finish(self):
|
||||
|
||||
for i in range(len(self.framedata) + 3):
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
def _timestr(self, sec):
|
||||
|
||||
try:
|
||||
hours = int(sec) // 3600 % 24
|
||||
minutes = int(sec) // 60 % 60
|
||||
seconds = int(sec) % 60
|
||||
return "{:02d}:{:02d}:{:02d}".format(hours, minutes, seconds)
|
||||
except Exception:
|
||||
return sec
|
||||
|
||||
def _mbstr(self, b):
|
||||
|
||||
try:
|
||||
return "{:.1f}".format((float(b) / (1024 ** 2)))
|
||||
except Exception:
|
||||
return b
|
||||
|
||||
def _progress_bar(self, p):
|
||||
|
||||
try:
|
||||
done_chars = int(float(p) * 30 / 100)
|
||||
todo_chars = int(30 - done_chars)
|
||||
return (">" * done_chars + "-" * todo_chars)[0:30]
|
||||
except Exception:
|
||||
return ">" * 30
|
||||
|
||||
|
||||
def parse_output(args):
|
||||
return args[-1]
|
||||
|
||||
|
||||
def parse_overwrite(args):
|
||||
return "-y" in args
|
||||
|
||||
|
||||
def ask_for_overwrite(commands, path):
|
||||
|
||||
print("File {} exists. Overwrite or break? ([y]/n)".format(path))
|
||||
answer = input()
|
||||
if answer == "n":
|
||||
sys.exit(1)
|
||||
|
||||
commands.insert(-1, "-y")
|
||||
return commands
|
||||
|
||||
|
||||
def main():
|
||||
commands = sys.argv[1:]
|
||||
if len(commands) == 1:
|
||||
if commands[0] == "-h":
|
||||
print(
|
||||
"This command passes all arguments to FFMPEG, and parses output to readable format with progress"
|
||||
)
|
||||
sys.exit(0)
|
||||
try:
|
||||
has_overwrite = parse_overwrite(commands)
|
||||
if not has_overwrite:
|
||||
output_file = parse_output(commands)
|
||||
output_exists = os.path.exists(output_file)
|
||||
if output_exists:
|
||||
commands = ask_for_overwrite(commands, output_file)
|
||||
|
||||
process = subprocess.Popen(
|
||||
["", "-progress", "pipe:2", "-hide_banner"] + commands,
|
||||
executable="ffmpeg",
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
chopper = Chopper(process.stdout)
|
||||
progress = Progress()
|
||||
|
||||
started = time.time()
|
||||
duration = None
|
||||
while True:
|
||||
output = chopper.read()
|
||||
progress.parse(output)
|
||||
if not output:
|
||||
if process.poll() != None:
|
||||
break
|
||||
|
||||
rc = process.poll()
|
||||
progress.finish()
|
||||
print("Exit code: {}".format(rc))
|
||||
sys.exit(rc)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
process.terminate()
|
||||
progress.finish()
|
||||
while process.poll() != None:
|
||||
time.sleep(1)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
172
py-packages/ffmpeg-parser/ffmpegparser/ffprobeparser.py
Executable file
172
py-packages/ffmpeg-parser/ffmpegparser/ffprobeparser.py
Executable file
@@ -0,0 +1,172 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import parse
|
||||
import argparse
|
||||
import json
|
||||
from ansi.colour import fg, fx
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class Probe:
|
||||
def __init__(self):
|
||||
parser = argparse.ArgumentParser(description="Readable version of ffprobe. Give filename as argument.")
|
||||
parser.add_argument("-c", dest="colorize", action="store_true", default=False, help="Colorize")
|
||||
parser.add_argument("filename", help="File to probe")
|
||||
parsed = parser.parse_args()
|
||||
|
||||
self.filename = parsed.filename
|
||||
self.colorize = parsed.colorize
|
||||
self.run_probe()
|
||||
self.print()
|
||||
|
||||
def run_probe(self):
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
"",
|
||||
"-v",
|
||||
"error",
|
||||
"-hide_banner",
|
||||
"-of",
|
||||
"default=noprint_wrappers=0",
|
||||
"-print_format", "json",
|
||||
"-show_format",
|
||||
"-show_streams",
|
||||
self.filename,
|
||||
],
|
||||
executable="ffprobe",
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
self.raw_data, errors = process.communicate()
|
||||
if len(errors) > 0:
|
||||
print(errors)
|
||||
self.json_data = json.loads(self.raw_data)
|
||||
|
||||
|
||||
def print(self):
|
||||
# ~ print(self.json_data['format'])
|
||||
self.json_data['format']['time'] = self._timestr(self.json_data['format'].get('duration',"0"))
|
||||
self.json_data['format']['hrate'] = self._speedfmt(self.json_data['format'].get('bit_rate',"0"))
|
||||
self.json_data['format']['hsize'] = self._sizefmt(float(self.json_data['format'].get('size',0)))
|
||||
self.json_data['format'].update(self._get_colors())
|
||||
|
||||
msg = """{Y}==== Format ===={z}
|
||||
File: {filename}
|
||||
Format: {format_long_name} ({format_name})
|
||||
Length: {time}
|
||||
Size: {hsize}
|
||||
Bitrate: {hrate}
|
||||
Streams: {nb_streams}""".format(**self.json_data['format']
|
||||
)
|
||||
print(msg)
|
||||
for stream in self.json_data['streams']:
|
||||
original = stream.copy()
|
||||
stream.update(self._get_colors())
|
||||
|
||||
if stream['codec_type'] == "video":
|
||||
msg = self.format_video(stream)
|
||||
elif stream['codec_type'] == "audio":
|
||||
msg = self.format_audio(stream)
|
||||
elif stream['codec_type'] == "subtitle":
|
||||
msg = self.format_subtitle(stream)
|
||||
else:
|
||||
msg = "Unrecognized stream\n{}".format(json.dumps(original, indent=2))
|
||||
|
||||
print(msg)
|
||||
|
||||
|
||||
def format_subtitle(self, stream):
|
||||
|
||||
stream['tags-lang'] = stream.get('tags',{}).get('language',"NA")
|
||||
msg = """{Y}==== Stream:{index} ===={z}
|
||||
Type: {G}{codec_type}{z}
|
||||
Codec: {codec_long_name} ({codec_name})
|
||||
Language: {tags-lang}""".format(**stream)
|
||||
return msg
|
||||
|
||||
def format_video(self, stream):
|
||||
stream['hrate'] = self._speedfmt(stream.get("bit_rate",0))
|
||||
stream['fps'] = "{:.2f}".format(float(stream['r_frame_rate'].split('/')[0]) / float(stream['r_frame_rate'].split('/')[1]))
|
||||
msg = """{Y}==== Stream:{index} ===={z}
|
||||
Type: {G}{codec_type}{z}
|
||||
Codec: {codec_long_name} ({codec_name})
|
||||
Bitrate: {hrate}
|
||||
Resolution: {width}x{height}
|
||||
Aspect: {display_aspect_ratio}
|
||||
Profile: {profile}
|
||||
FPS: {r_frame_rate} ({fps})""".format(**stream)
|
||||
|
||||
return msg
|
||||
|
||||
def format_audio(self, stream):
|
||||
stream['hrate'] = self._speedfmt(stream.get("bit_rate",0))
|
||||
msg = """{Y}==== Stream:{index} ===={z}
|
||||
Type: {G}{codec_type}{z}
|
||||
Codec: {codec_long_name} ({codec_name})
|
||||
Bitrate: {hrate}
|
||||
Sample Rate: {sample_rate} Hz
|
||||
Channels: {channels} ({channel_layout})""".format(**stream)
|
||||
|
||||
return msg
|
||||
|
||||
|
||||
def _timestr(self, sec):
|
||||
msec = int((float(sec) - int(float(sec)))*1000)
|
||||
sec = int(float(sec))
|
||||
try:
|
||||
hours = int(sec) // 3600 % 24
|
||||
minutes = int(sec) // 60 % 60
|
||||
seconds = int(sec) % 60
|
||||
|
||||
return "{:02d}:{:02d}:{:02d}.{:03d}".format(hours, minutes, seconds, msec)
|
||||
except Exception:
|
||||
return sec
|
||||
|
||||
def _mbstr(self, b):
|
||||
|
||||
try:
|
||||
return "{:.1f}".format((float(b) / (1024 ** 2)))
|
||||
except Exception:
|
||||
return b
|
||||
|
||||
def _speedfmt(self, b):
|
||||
|
||||
return self._sizefmt(float(b), suffix="bit/s")
|
||||
try:
|
||||
return "{:.1f}".format((float(b) / (1024 ** 2)))
|
||||
except Exception:
|
||||
return b
|
||||
|
||||
def _sizefmt(self, num, suffix='B'):
|
||||
|
||||
num = float(num)
|
||||
for unit in ['','K','M','G','T','P','E','Z']:
|
||||
if abs(num) < 1024.0:
|
||||
return "%3.1f %s%s" % (num, unit, suffix)
|
||||
num /= 1024.0
|
||||
return "%.1f %s%s" % (num, 'Y', suffix)
|
||||
|
||||
def _get_colors(self):
|
||||
|
||||
if self.colorize:
|
||||
return {
|
||||
'Y': fg.boldyellow,
|
||||
'G': fg.boldgreen,
|
||||
'z': fx.reset,
|
||||
}
|
||||
else:
|
||||
return {
|
||||
'Y': "",
|
||||
'G': "",
|
||||
'z': "",
|
||||
}
|
||||
|
||||
def main():
|
||||
Probe()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user