Files
tsmark/tsmark/video_annotator.py

1554 lines
58 KiB
Python
Executable File

import importlib.util
import json
import os
import shlex
import subprocess
import sys
import threading
import time
import cv2
import numpy as np
from scipy.interpolate import PchipInterpolator
PLUGIN_FOLDER = os.path.expanduser("~/.config/tsmark/plugins")
COLOR_PREPOST = (0, 128, 128)
COLOR_KEY = (60, 205, 60)
COLOR_KEY_OCCLUDED = (50, 128, 50)
COLOR_INTERP = (192, 0, 192)
COLOR_INTERP_OCCLUDED = (128, 0, 128)
COLOR_HIDDEN = (60, 60, 60)
COLOR_NONE = (255, 255, 255)
POINT_VISIBILITY = ("yes", "occluded", "hidden")
class Marker:
def __init__(self, opts):
self.opts = opts
if not os.path.exists(self.opts.video):
raise FileNotFoundError("Video file missing!")
self.paused = False
self.read_next = False
self.show_info = True
self.show_help = False
self.auto_step = True
self.font = cv2.FONT_HERSHEY_SIMPLEX
self.frame_visu = []
self.frame_raw = []
self.max_res = tuple([int(x) for x in self.opts.max_res.split("x")])
self.min_res = (512, None)
self.mouse_position = (0, 0)
self.crop = [(None, None), (None, None), None]
self.crop_click = 0
self.point_click = 0
self.point_tracking = 0
self.point_tracking_length = float(self.opts.max_track)
self.points = {}
self.points_interpolated = {}
self.point_index = None
self.points_interpolation_enabled = True
self.points_interpolation_required = False
self.message = None
self.message_timer = time.time()
self.autosave_interval = 60
self.autosave_timer = time.time()
self.forced_fps = opts.fps
try:
self.open()
self.calculate_res()
self.parse_timestamps()
self.load_plugin()
if self.opts.start_time:
try:
self.nr = int(self.opts.start_time)
except ValueError:
self.nr = self.parse_time(self.opts.start_time)
self.loop()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
raise e
def open(self):
self.video_reader = cv2.VideoCapture(self.opts.video)
self.frames = int(self.video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
self.fps = self.video_reader.get(cv2.CAP_PROP_FPS)
self.spf = 1 / self.fps
self.viewer_fps = self.forced_fps if self.forced_fps else self.fps
self.viewer_spf = 1 / self.viewer_fps
self.video_length = self.frames * self.fps
def calculate_res(self):
self.video_res = [
int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)),
]
self.video_res_original = [
int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)),
]
video_aspect = self.video_res[0] / self.video_res[1]
if self.video_res[0] > self.max_res[0]:
self.video_res[0] = int(self.max_res[0])
self.video_res[1] = int(self.video_res[0] / video_aspect)
if self.video_res[1] > self.max_res[1]:
self.video_res[1] = int(self.max_res[1])
self.video_res[0] = int(self.video_res[1] * video_aspect)
if self.video_res[0] < self.min_res[0]:
self.video_res[0] = int(self.min_res[0])
self.video_res[1] = int(self.video_res[0] / video_aspect)
self.video_res = tuple(self.video_res)
self.crop = [(0, 0), tuple(self.video_res), None]
if self.opts.crop:
w, h, x, y = [int(c) for c in self.opts.crop.split(":")]
self.crop = [
self.original_to_visual((x, y)),
self.original_to_visual((w, h)),
True,
]
self.bar_start = int(self.video_res[0] * 0.05)
self.bar_end = int(self.video_res[0] * 0.95)
self.bar_top = int(self.video_res[1] * 0.90)
self.bar_bottom = int(self.video_res[1] * 0.95)
def calculate_step(self):
now = time.time()
self.last_move = [x for x in self.last_move if x[1] > now - 3]
if len(self.last_move) == 0:
self.step = 1
self.last_move = []
return
lefts = sum([1 for x in self.last_move if x[0] == "l"])
rights = sum([1 for x in self.last_move if x[0] == "r"])
if lefts > 0 and rights > 0:
self.step = 1
self.last_move = []
return
count = max(lefts, rights)
if count < 5:
self.step = 1
else:
# x2 poly from 5:5 -> 15:180
self.step = 45 - 16.5 * count + 1.7 * count * count
self.step = min(self.step, 0.1 * self.video_length)
self.step = int(self.step)
def draw_bar(self, frame):
position = self.nr / self.frames
bar_position = int(self.bar_start + position * (self.bar_end - self.bar_start))
cv2.rectangle(
frame,
(self.bar_start, self.bar_top),
(self.bar_end, self.bar_bottom),
(255, 255, 255),
2,
)
for ts in self.stamps:
ts_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
cv2.line(
frame,
(ts_pos, self.bar_top),
(ts_pos, self.bar_bottom),
(32, 32, 32),
3,
)
cv2.line(
frame,
(ts_pos, self.bar_top),
(ts_pos, self.bar_bottom),
(84, 255, 63),
1,
)
if self.point_click == 1 and self.point_index in self.points:
bar_middle = int((self.bar_top + self.bar_bottom) / 2)
for ts in self.points[self.point_index]:
p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
cv2.circle(frame, (p_pos, bar_middle), 3, (32, 32, 32), -1)
for ts in self.points[self.point_index]:
p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
color = self.get_point_color(self.points[self.point_index][ts])
cv2.circle(frame, (p_pos, bar_middle), 1, color, -1)
cv2.line(
frame,
(bar_position, self.bar_top),
(bar_position, self.bar_bottom),
(63, 84, 255),
1,
)
self.shadow_text(
frame,
"1",
(self.bar_start - 7, self.bar_bottom + 20),
0.7,
2,
(255, 255, 255),
)
end_frame = self.format_time(self.frames - 1)
(text_width, text_height) = cv2.getTextSize(end_frame, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
self.shadow_text(
frame,
end_frame,
(self.bar_end - text_width, self.bar_bottom + 20),
0.7,
2,
(255, 255, 255),
)
def draw_crop(self, frame):
if self.crop[2] is None:
return
p2 = (self.crop[0][0] + self.crop[1][0], self.crop[0][1] + self.crop[1][1])
cv2.rectangle(
frame,
self.crop[0],
p2,
(0, 192, 192),
1,
)
if self.crop_click == 1:
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
self.shadow_text(
frame,
f"{x},{y}",
self.crop[0],
0.5,
1,
(0, 192, 192),
)
if self.crop_click == 2:
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1]))
self.shadow_text(
frame,
f"{w}x{h}",
self.crop[0],
0.5,
1,
(0, 192, 192),
)
def draw_points(self, frame):
if self.opts.output_points is None:
return
for index in self.points:
if index == self.point_index and self.point_click == 1:
continue
current = self.get_interpolated_point(index=index)
if current["type"] in ("pre", "post"):
continue
if current["visible"] == "hidden":
continue
color = self.get_point_color(current)
cv2.circle(frame, (current["cx"], current["cy"]), 10, (0, 0, 0), 2)
cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1)
self.shadow_text(
frame,
index,
(current["cx"], current["cy"]),
0.5,
1,
color,
)
if self.point_click == 1:
# Draw crosshair
cv2.line(
frame, (self.mouse_position[0], 0), (self.mouse_position[0], self.video_res[1]), (128, 128, 128), 1
)
cv2.line(
frame, (0, self.mouse_position[1]), (self.video_res[0], self.mouse_position[1]), (128, 128, 128), 1
)
# Show current track
x, y = [20, 70]
self.shadow_text(
frame,
"P:" + str(self.point_index),
(x, y),
0.5,
1,
(255, 255, 255),
)
try:
current = self.get_interpolated_point()
if current["type"] is not None:
color = self.get_point_color(current)
cv2.rectangle(
frame,
(current["x0"], current["y0"]),
(current["x1"], current["y1"]),
color,
2,
)
cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1)
history = []
for p in range(max(1, int(self.nr - self.viewer_fps)), self.nr + 1):
po = self.get_interpolated_point(p)
history.append([po["cx"], po["cy"]])
history = np.array(history, np.int32).reshape((-1, 1, 2))
cv2.polylines(frame, [history], False, COLOR_INTERP, 1)
except KeyError:
print(self.get_interpolated_point(), self.nr)
pass
except IndexError:
print(self.get_interpolated_point(), self.nr)
pass
try:
current = self.get_point()
if current["x0"] is not None:
cv2.circle(frame, (current["cx"], current["cy"]), 13, COLOR_KEY, 2)
except KeyError:
pass
except IndexError:
print(self.points[self.point_index])
print(self.nr)
pass
def scan_point(self, direction):
def set_nr(ts):
self.nr = ts - 1
self.read_next = True
try:
if direction == "first":
return set_nr(min(list(self.points[self.point_index].keys())))
if direction == "last":
return set_nr(max(list(self.points[self.point_index].keys())))
if direction == "next":
for ts in sorted(list(self.points[self.point_index].keys())):
if ts > self.nr:
return set_nr(ts)
if direction == "previous":
for ts in reversed(sorted(list(self.points[self.point_index].keys()))):
if ts < self.nr - 1:
return set_nr(ts)
except Exception:
pass
def toggle_point(self, ts):
try:
if ts in self.points[self.point_index]:
# Remove point
del self.points[self.point_index][ts]
else:
# Introduce point from interpolated
ip = self.get_interpolated_point()
if ip["type"] is None:
return
self.points[self.point_index][self.nr] = {
"x0": ip["x0"],
"y0": ip["y0"],
"x1": ip["x1"],
"y1": ip["y1"],
"visible": POINT_VISIBILITY[0],
}
self.interpolate_points()
except Exception:
pass
def get_point(self, nr=None, index=None):
"""{x0,y0,x1,y1, cx, cy, w, h, visible}"""
if nr is None:
nr = self.nr
if index is None:
index = self.point_index
if index in self.points:
if nr in self.points[index]:
value = self.points[index][nr].copy()
value.update(
{
"cx": int((value["x0"] + value["x1"]) / 2),
"cy": int((value["y0"] + value["y1"]) / 2),
"w": int(abs(value["x0"] - value["x1"])),
"h": int(abs(value["y0"] - value["y1"])),
}
)
return value
return {
"x0": None,
"y0": None,
"x1": None,
"y1": None,
"cx": None,
"cy": None,
"w": None,
"h": None,
"visible": None,
}
def get_interpolated_point(self, nr=None, index=None):
"""{x0,y0,x1,y1, cx, cy, visible, type, age}"""
if nr is None:
nr = self.nr
if index is None:
index = self.point_index
if index in self.points_interpolated:
if nr in self.points_interpolated[index]:
value = self.points_interpolated[index][nr].copy()
value.update(
{
"cx": int((value["x0"] + value["x1"]) / 2),
"cy": int((value["y0"] + value["y1"]) / 2),
}
)
return value
return {
"x0": None,
"y0": None,
"x1": None,
"y1": None,
"cx": None,
"cy": None,
"visible": None,
"type": None,
"age": None,
}
def convert_interpolated_points(self):
if self.point_click == 1 and self.point_index in self.points:
for nr in range(self.frames):
ip = self.get_interpolated_point(nr=nr)
if ip["type"] == "interp" and ip["visible"] == POINT_VISIBILITY[0]:
self.points[self.point_index][nr] = {
"x0": ip["x0"],
"y0": ip["y0"],
"x1": ip["x1"],
"y1": ip["y1"],
"visible": POINT_VISIBILITY[0],
}
self.interpolate_points()
def modify_point(self, position, x, y):
"""position: tl topleft, br bottomright, c center"""
if not self.point_index in self.points:
self.points[self.point_index] = {}
if not self.nr in self.points[self.point_index]:
if len(self.points[self.point_index]) > 0:
keys = sorted(list(self.points[self.point_index].keys()))
if self.nr > keys[-1]: # last point if at end of track
last_p = self.points[self.point_index][keys[-1]]
elif self.nr < keys[0]: # first point if before track
last_p = self.points[self.point_index][keys[0]]
else: # previous point if in the middle of track
prev_key = keys[0]
for key in keys:
if key > self.nr:
last_p = self.points[self.point_index][prev_key]
break
prev_key = key
w = abs(last_p["x1"] - last_p["x0"])
h = abs(last_p["y1"] - last_p["y0"])
else:
w = 50
h = 50
if position == "tl":
self.points[self.point_index][self.nr] = {
"x0": x,
"y0": y,
"x1": min(self.video_res[0] - 1, x + w),
"y1": min(self.video_res[1] - 1, y + h),
"visible": POINT_VISIBILITY[0],
}
if position == "br":
self.points[self.point_index][self.nr] = {
"x0": max(0, x - w),
"y0": max(0, y - h),
"x1": x,
"y1": y,
"visible": POINT_VISIBILITY[0],
}
if position == "c":
self.points[self.point_index][self.nr] = {
"x0": max(0, int(x - w / 2)),
"y0": max(0, int(y - h / 2)),
"x1": min(self.video_res[0] - 1, int(x + w / 2)),
"y1": min(self.video_res[1] - 1, int(y + h / 2)),
"visible": POINT_VISIBILITY[0],
}
else:
# not a new point
self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[0]
if position == "c":
current = self.points[self.point_index][self.nr]
w = abs(current["x1"] - current["x0"])
h = abs(current["y1"] - current["y0"])
self.points[self.point_index][self.nr] = {
"x0": max(0, int(x - w / 2)),
"y0": max(0, int(y - h / 2)),
"x1": min(self.video_res[0] - 1, int(x + w / 2)),
"y1": min(self.video_res[1] - 1, int(y + h / 2)),
"visible": POINT_VISIBILITY[0],
}
elif position == "tl":
self.points[self.point_index][self.nr]["x0"] = x
self.points[self.point_index][self.nr]["y0"] = y
elif position == "br":
self.points[self.point_index][self.nr]["x1"] = x
self.points[self.point_index][self.nr]["y1"] = y
if self.points[self.point_index][self.nr]["x0"] > self.points[self.point_index][self.nr]["x1"]:
self.points[self.point_index][self.nr]["x1"], self.points[self.point_index][self.nr]["x0"] = (
self.points[self.point_index][self.nr]["x0"],
self.points[self.point_index][self.nr]["x1"],
)
if self.points[self.point_index][self.nr]["y0"] > self.points[self.point_index][self.nr]["y1"]:
self.points[self.point_index][self.nr]["y1"], self.points[self.point_index][self.nr]["y0"] = (
self.points[self.point_index][self.nr]["y0"],
self.points[self.point_index][self.nr]["y1"],
)
self.interpolate_points()
def modify_point_wh(self):
if self.point_click == 0:
self.add_message("Not in point clicking mode")
return
if self.opts.output_points is None:
return
curr_point = self.get_point()
if curr_point["x0"] is None:
self.add_message("Not in point frame (green)")
return
new_wh = abs(self.mouse_position[0] - curr_point["cx"])
new_hh = abs(self.mouse_position[1] - curr_point["cy"])
self.points[self.point_index][self.nr]["x0"] = int(curr_point["cx"] - new_wh)
self.points[self.point_index][self.nr]["y0"] = int(curr_point["cy"] - new_hh)
self.points[self.point_index][self.nr]["x1"] = int(curr_point["cx"] + new_wh)
self.points[self.point_index][self.nr]["y1"] = int(curr_point["cy"] + new_hh)
self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[0]
self.interpolate_points()
def toggle_point_visibility(self):
if self.point_click == 0:
self.add_message("Not in point clicking mode")
return
if self.opts.output_points is None:
return
curr_point = self.get_point()
if curr_point["x0"] is None:
self.add_message("Not in point frame (green)")
return
try:
new_index = (1 + POINT_VISIBILITY.index(self.points[self.point_index][self.nr]["visible"])) % len(
POINT_VISIBILITY
)
except (ValueError, KeyError):
new_index = 0
self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[new_index]
try:
self.points_interpolated[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[new_index]
except Exception as e:
print(e)
pass
def track_point(self):
if self.point_click == 0:
self.add_message("Not in point clicking mode")
return
if self.opts.output_points is None:
return
tracker_gui = TrackerGUI(self)
if len(tracker_gui.points) > 0:
for nr in tracker_gui.points:
self.points[self.point_index][nr] = tracker_gui.points[nr]
self.interpolate_points()
self.nr = max(tracker_gui.points) - 1
self.read_next = True
def load_plugin(self):
self.plugin = None
if self.opts.plugin:
if not os.path.exists(os.path.join(PLUGIN_FOLDER, "hello.py")):
os.makedirs(PLUGIN_FOLDER, exist_ok=True)
with open(os.path.join(PLUGIN_FOLDER, "hello.py"), "wt") as fp:
fp.write(
"""import cv2
import numpy as np
class World:
def __init__(self, tsmark):
self.tsmark = tsmark
self.window_name = "tsmark - plugin"
print("plugin loaded")
def __call__(self):
print("plugin called")
self.tsmark.paused = True
cv2.namedWindow(self.window_name, flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
frame = cv2.resize(np.zeros((16, 16, 3), dtype=np.uint8), self.tsmark.video_res)
self.tsmark.shadow_text(frame, "Hello World! press q to exit.", (100, 80), 0.75, 2, (255, 255, 255))
cv2.imshow(self.window_name, frame)
while True:
k = cv2.waitKey(10)
# break if ESC pressed, q, space or enter
if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13:
break
cv2.destroyWindow(self.window_name)
return
"""
)
plugin_file, plugin_class = self.opts.plugin.split(":", 1)
plugin_path = os.path.join(PLUGIN_FOLDER, plugin_file + ".py")
module_spec = importlib.util.spec_from_file_location("Plugin", plugin_path)
loaded_plugin = importlib.util.module_from_spec(module_spec)
module_spec.loader.exec_module(loaded_plugin)
plugin_class = getattr(loaded_plugin, plugin_class)
self.plugin = plugin_class(self)
def launch_plugin(self):
if self.plugin:
self.plugin()
def interpolate_points(self, point_index=None):
"""types:
key: user clicked / accepted frame
interp: interpolated frame
pre: before any keyframes
post: after any keyframes
"""
if self.points_interpolation_enabled:
process = threading.Thread(target=self.interpolate_points_in_thread, args=(point_index,))
process.start()
def interpolate_points_in_thread(self, point_index=None):
if point_index is None:
point_index = self.point_index
def i_point(x0=None, y0=None, x1=None, y1=None, t=None, visible=None, age=None):
return {"x0": x0, "y0": y0, "x1": x1, "y1": y1, "type": t, "visible": visible, "age": age}
def point2array(p):
return [p["x0"], p["y0"], p["x1"], p["y1"]]
if not point_index in self.points:
return
if not point_index in self.points_interpolated:
self.points_interpolated[point_index] = {key: {} for key in range(self.frames)}
# ~ self.points_interpolation_required = False
new_points = {k: v for k, v in self.points_interpolated[point_index].items()}
if len(self.points[point_index]) == 1: # only one point added
key = list(self.points[point_index].keys())[0]
vals = self.points[point_index][key]
for key in range(self.frames):
new_points[key] = i_point()
new_points[key].update(vals)
new_points[key]["type"] = "pre" if key < self.nr else "post"
new_points[self.nr]["type"] = "key"
self.points_interpolated[point_index] = new_points
else: # more points
point_keys = list(sorted(list(self.points[point_index].keys())))
point_values = [point2array(self.points[point_index][k]) for k in point_keys]
xyxy = np.array(point_values).T
spline = PchipInterpolator(point_keys, xyxy, axis=1)
start_key = min(point_keys)
end_key = max(point_keys) + 1
t2 = np.arange(start_key, end_key)
# Pre points
for key in range(0, start_key):
new_points[key]["type"] = "pre"
new_points[key].update(self.points[point_index][start_key])
# interpolated points
visible = self.points[point_index][start_key]["visible"]
for row in np.vstack((t2, spline(t2))).T:
if row[0] in point_keys:
visible = self.points[point_index][row[0]]["visible"]
new_points[row[0]] = {
"type": "interp",
"x0": int(row[1]),
"y0": int(row[2]),
"x1": int(row[3]),
"y1": int(row[4]),
"visible": visible,
}
# post points
for key in range(end_key, self.frames + 1):
new_points[key] = {
"type": "post",
"x0": int(row[1]),
"y0": int(row[2]),
"x1": int(row[3]),
"y1": int(row[4]),
"visible": visible,
}
# clicked points (not necessary, could determine at draw time!)
for key in point_keys:
new_points[key]["type"] = "key"
age = 0
for key in new_points:
if new_points[key]["type"] == "key":
age = 0
if new_points[key]["type"] == "interp":
age += 1
new_points[key]["age"] = age
self.points_interpolated[point_index] = new_points
def toggle_interpolation(self):
self.points_interpolation_enabled = not self.points_interpolation_enabled
if self.points_interpolation_enabled:
self.interpolate_points()
def draw_help(self, frame):
bottom = 80
left = 100
for row in self.get_help().split("\n"):
self.shadow_text(frame, row, (left, bottom), 0.6, 1, (255, 255, 255))
bottom += 18
def draw_label(self, frame):
if not self.nr in self.stamps:
return
text = "{} #{}".format(self.nr, self.stamps.index(self.nr) + 1)
bottom = 60
left = 10
self.shadow_text(frame, text, (left, bottom), 1, 2, (63, 84, 255))
def draw_time(self, frame):
left = 10
bottom = 30
formatted = "{} {}".format(
self.format_time(self.nr),
f"|| ({self.nr})" if self.paused else "",
)
self.shadow_text(frame, formatted, (left, bottom), 1.1, 2, (255, 255, 255))
def draw_message(self, frame):
if self.message is None:
return
if time.time() - 5 > self.message_timer:
self.message = None
return
left = 10
bottom = 90
self.shadow_text(frame, self.message, (left, bottom), 0.9, 2, (255, 255, 255))
def add_message(self, new):
self.message = new
self.message_timer = time.time()
def format_time(self, nframe):
seconds = int(nframe / self.fps)
frame = nframe % self.fps
parts = int(100 * (frame / self.fps))
return time.strftime("%H:%M:%S", time.gmtime(seconds)) + ".%02d" % (parts)
def get_point_color(self, point):
t = point.get("type", "key")
v = point.get("visible", "yes")
if v == "hidden":
return COLOR_HIDDEN
if t == "key":
if v == "yes":
return COLOR_KEY
if v == "occluded":
return COLOR_KEY_OCCLUDED
if t == "interp":
if v == "yes":
return COLOR_INTERP
if v == "occluded":
return COLOR_INTERP_OCCLUDED
if t in ("post", "pre"):
return COLOR_PREPOST
return COLOR_NONE
def get_help(self):
return """Keyboard help:
Arrows, PgUp, PgDn, Home, End or click mouse in position bar
j l i k [ ]
jump in video position
0-9 move to 0%,10%,20% .. position
, and . move one frame at a time
z and c move to previous or next mark
x or double click in the video
mark frame
space or click video
pause
a and s modify crop offset or size
f toggle 0.25x 1x or 4x FPS
v toggle HUD
h toggle help
q or esc quit
Bounding box editor:
p toggle bounding box drawing. enter any key as index.
o toggle object is visible/occluded/hidden
x toggle (delete) key frame
r convert interpolated points to points (no undo!)
u toggle automatic interpolation
mouse left: set top-left corner of box
mouse middle: set center of box
mouse right: set lower right corner of box
e set width/height of box symmetric around center
z c Home End move between key-frames
t start optical flow tracker
m start plugin (if defined)
Color codes:
green |keypoint
purple |interpolated
darker tone |occluded key/interpolated
yellow |post / pre points
gray |point is hidden
"""
def mouse_click(self, event, x, y, flags, param):
in_bar = all(
(
x < self.bar_end,
x > self.bar_start,
y < self.bar_bottom,
y > self.bar_top,
)
)
self.mouse_position = (x, y)
if self.crop_click == 1:
self.crop[0] = (x, y)
if event == cv2.EVENT_LBUTTONDOWN:
self.crop_click = 0
return
if self.crop_click == 2:
self.crop[1] = (x - self.crop[0][0], y - self.crop[0][1])
if event == cv2.EVENT_LBUTTONDOWN:
self.crop_click = 0
return
if self.point_click == 1:
if event == cv2.EVENT_LBUTTONDOWN:
self.modify_point("tl", int(x), int(y))
if event == cv2.EVENT_RBUTTONDOWN:
self.modify_point("br", int(x), int(y))
if event == cv2.EVENT_MBUTTONDOWN:
self.modify_point("c", int(x), int(y))
return
if event == cv2.EVENT_LBUTTONDOWN:
if in_bar:
click_relative = (x - self.bar_start) / (self.bar_end - self.bar_start)
self.nr = int(click_relative * self.frames)
self.read_next = True
else:
self.paused = not self.paused
if event == cv2.EVENT_LBUTTONDBLCLK:
if not in_bar:
self.toggle_stamp()
# doubleclick (toggle?)
# ~ print("double", x, y)
def parse_time(self, timestr):
"""return frames"""
colon_count = len(timestr.split(":")) - 1
if colon_count == 0:
secs = float(timestr)
return int(secs * self.fps)
if colon_count == 1:
mins, secstr = timestr.split(":", 1)
sec = float(secstr)
return int(self.fps * (int(mins) * 60 + sec))
if colon_count == 2:
hours, mins, secstr = timestr.split(":", 2)
sec = float(secstr)
return int(self.fps * (int(hours, 10) * 3600 + int(mins, 10) * 60 + sec))
raise ValueError("Cannot parse time definition {}".format(timestr))
raise TypeError("Cannot parse time definition {}".format(timestr))
def parse_timestamps(self):
self.stamps = []
if self.opts.timestamps:
if os.path.exists(self.opts.timestamps):
with open(self.opts.timestamps, "rt") as fp:
for row in fp.readlines():
# if row has 3 cols, pick the frame number directly
splitted = row.split(",")
if len(splitted) == 3:
self.stamps.append(int(splitted[2], 10))
if len(splitted) < 3:
self.opts.timestamps.append(splitted[0])
else:
self.opts.timestamps = self.opts.timestamps.split(",")
if len(self.stamps) > 0:
self.stamps.sort()
else:
self.stamps = sorted([self.parse_time(ts.strip()) for ts in self.opts.timestamps if ts.strip() != ""])
self.stamps = [x for x in self.stamps if 0 <= x < self.frames]
self.nr = self.stamps[0]
else:
self.stamps = []
self.nr = 0
# Read bounding boxes from JSON
if self.opts.input_points:
if os.path.exists(self.opts.input_points):
with open(self.opts.input_points, "rt") as fp:
self.points = json.load(fp)
keys = list(self.points.keys())
for index in keys:
# Remove empty dicts
if len(self.points[index]) == 0:
del self.points[index]
continue
self.point_index = index
self.points[index] = {int(k): v for k, v in self.points[index].items()}
for key in self.points[index]:
self.points[index][key]["x0"], self.points[index][key]["y0"] = self.original_to_visual(
(self.points[index][key]["x0"], self.points[index][key]["y0"])
)
self.points[index][key]["x1"], self.points[index][key]["y1"] = self.original_to_visual(
(self.points[index][key]["x1"], self.points[index][key]["y1"])
)
if not self.points[index][key].get("visible", "NA") in POINT_VISIBILITY:
self.points[index][key]["visible"] = POINT_VISIBILITY[0]
self.interpolate_points()
print(f"Loaded points with index: {index}")
self.point_index = None
def print_help(self):
print(self.get_help())
def print_timestamps(self):
if self.crop[2] is None:
cropstr = []
else:
self.opts.ffmpeg_copy = False
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1]))
if w < 0:
x = x + w
w = -w
if h < 0:
y = y + h
h = -h
cropstr = ["-vf", f"crop={w}:{h}:{x}:{y}"]
self.stamps.sort()
print("# Timestamps:")
for i, ts in enumerate(self.stamps):
print("# {}: {} / {}".format(i + 1, self.format_time(ts), ts))
if len(self.stamps) == 0:
self.stamps.append(0)
self.stamps.append(self.frames)
padlen = len(str(self.frames))
src_name_print = self.opts.video.replace('"', '\\"')
tgt_name_print = os.path.splitext(self.opts.video)[0].replace('"', '\\"')
for i in range(1, len(self.stamps), 2):
from_ts = self.stamps[i - 1]
to_ts = self.stamps[i]
from_ft = self.format_time(from_ts)
to_ft = self.format_time(to_ts)
from_str = str(from_ts).zfill(padlen)
to_str = str(to_ts).zfill(padlen)
ffmpeg_args_print = []
ffmpeg_args = []
for arg in shlex.split(self.opts.ffmpeg_args):
if arg == "{crop}":
ffmpeg_args_print.extend(cropstr)
ffmpeg_args.extend(cropstr)
else:
ffmpeg_args_print.append(
arg.format(
input=shlex.quote(src_name_print),
output=shlex.quote(f"{tgt_name_print}.trim.{from_str}-{to_str}"),
start_time=from_ft,
end_time=to_ft,
)
)
ffmpeg_args.append(
arg.format(
input=self.opts.video,
output=f"{os.path.splitext(self.opts.video)[0]}.trim.{from_str}-{to_str}",
start_time=from_ft,
end_time=to_ft,
)
)
print(" ".join(["ffmpeg", "-hide_banner", *ffmpeg_args_print]))
if self.opts.ffmpeg_run:
subprocess.run(["ffmpeg", "-hide_banner", *ffmpeg_args])
def save_timestamps(self):
if self.opts.output is not None:
with open(self.opts.output, "wt") as fp:
for i, ts in enumerate(self.stamps):
fp.write("{},{},{}\n".format(self.format_time(ts), i + 1, ts))
print("Saved timestamps")
if self.opts.output_points is not None:
points = {}
for index in self.points.keys():
points[index] = {}
for key in sorted(self.points[index].keys()):
points[index][key] = self.points[index][key].copy()
points[index][key]["x0"], points[index][key]["y0"] = self.visual_to_original(
(self.points[index][key]["x0"], self.points[index][key]["y0"])
)
points[index][key]["x1"], points[index][key]["y1"] = self.visual_to_original(
(self.points[index][key]["x1"], self.points[index][key]["y1"])
)
if len(points[index]) == 0:
del points[index]
with open(self.opts.output_points, "wt") as fp:
json.dump(points, fp, indent=2)
print("Saved points")
def shadow_text(self, frame, text, pos, size, thicc, color):
cv2.putText(
frame,
text,
pos,
self.font,
size,
(0, 0, 0),
2 * thicc,
cv2.LINE_AA,
)
cv2.putText(
frame,
text,
pos,
self.font,
size,
color,
thicc,
cv2.LINE_AA,
)
def toggle_stamp(self):
if self.nr in self.stamps:
self.stamps.remove(self.nr)
else:
self.stamps.append(self.nr)
self.stamps.sort()
def original_to_visual(self, t):
"""display (x,y) to video resolution (x,y)"""
return (
int(self.video_res[0] * t[0] / self.video_res_original[0]),
int(self.video_res[1] * t[1] / self.video_res_original[1]),
)
def visual_to_original(self, t):
"""video resolution (x,y) to display (x,y)"""
return (
int(self.video_res_original[0] * t[0] / self.video_res[0]),
int(self.video_res_original[1] * t[1] / self.video_res[1]),
)
def loop(self):
self.step = 1
self.bigstep = 30
self.hugestep = 300
self.auto_step = False
self.last_move = []
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
self.print_help()
cv2.namedWindow("tsmark", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
cv2.setMouseCallback("tsmark", self.mouse_click)
digits_ords = [ord(str(x)) for x in range(10)]
FPS_modifier = 1
FPS_modifiers = [0.25, 1, 4]
read_fails = 0
while self.video_reader.isOpened():
show_time = time.time()
if (not self.paused) or self.read_next:
ret, frame = self.video_reader.read()
self.frame_raw = frame
if ret == True:
read_fails = 0
draw_wait = 200 if self.paused or (self.paused and self.point_click == 0) else 1
if (not self.paused) or self.read_next:
self.read_next = False
frame_visu = cv2.resize(frame.copy(), self.video_res)
self.frame_visu = frame_visu
self.draw_crop(frame_visu)
self.draw_points(frame_visu)
nr_time = self.nr / self.fps
if self.show_info:
self.draw_time(frame_visu)
self.draw_bar(frame_visu)
self.draw_label(frame_visu)
self.draw_message(frame_visu)
if self.show_help:
self.draw_help(frame_visu)
if cv2.getWindowProperty("tsmark", cv2.WND_PROP_VISIBLE) < 1:
break
cv2.imshow("tsmark", frame_visu)
k = cv2.waitKey(draw_wait)
if k & 0xFF == ord("q") or k & 0xFF == 27:
break
elif k & 0xFF == 32: # space
self.paused = not self.paused
# Movement =================
elif k & 0xFF == 80: # home key
if self.point_click == 1:
self.scan_point("first")
else:
self.nr = -1
self.read_next = True
elif k & 0xFF == 87: # end key
if self.point_click == 1:
self.scan_point("last")
else:
self.nr = self.frames - 1
self.paused = True
self.read_next = True
elif k & 0xFF == 85 or k & 0xFF == ord("]"): # pg up
self.nr = int((nr_time + self.hugestep) * self.fps) - 1
self.read_next = True
elif k & 0xFF == 86 or k & 0xFF == ord("["): # pg down
self.nr = int((nr_time - self.hugestep) * self.fps) - 1
self.read_next = True
elif k & 0xFF == 82 or k & 0xFF == ord("i"): # up arrow
self.nr = int((nr_time + self.bigstep) * self.fps) - 1
self.read_next = True
elif k & 0xFF == 84 or k & 0xFF == ord("k"): # down arrow
self.nr = int((nr_time - self.bigstep) * self.fps) - 1
self.read_next = True
elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow
self.last_move.append(("r", time.time()))
if self.auto_step:
self.calculate_step()
self.nr = int((nr_time + self.step) * self.fps) - 1
self.read_next = True
elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow
self.last_move.append(("l", time.time()))
if self.auto_step:
self.calculate_step()
self.nr = int((nr_time - self.step) * self.fps) - 1
self.read_next = True
# Move by frame
elif k & 0xFF == ord("."):
self.paused = True
self.read_next = True
elif k & 0xFF == ord(","):
self.paused = True
self.nr -= 2
self.read_next = True
elif k & 0xFF == ord("z"): # move to previous ts
if self.point_click == 1:
self.scan_point("previous")
else:
for ts in reversed(sorted(self.stamps)):
if ts < self.nr - 1:
self.nr = ts - 1
self.read_next = True
break
elif k & 0xFF == ord("c"): # move to next ts
if self.point_click == 1:
self.scan_point("next")
else:
for ts in sorted(self.stamps):
if ts > self.nr:
self.nr = ts - 1
self.read_next = True
break
# Move by number
elif k & 0xFF in digits_ords:
self.nr = int(digits_ords.index(k & 0xFF) * self.frames / 10) - 1
self.read_next = True
# Toggling =================
elif k & 0xFF == ord("f"): # modify FPS
FPS_modifier = (FPS_modifier + 1) % len(FPS_modifiers)
self.add_message(f"Player speed {round(1/FPS_modifiers[FPS_modifier],2)}")
elif k & 0xFF == ord("a"): # toggle crop offset
self.crop_click = 0 if self.crop_click == 1 else 1
self.crop[2] = True
elif k & 0xFF == ord("s"): # toggle crop size
self.crop_click = 0 if self.crop_click == 2 else 2
self.crop[2] = True
elif k & 0xFF == ord("o"): # toggle point visibility (yes/occlusion/hidden)
if self.opts.output_points is not None:
self.toggle_point_visibility()
elif k & 0xFF == ord("p"): # toggle points
if self.opts.output_points is not None:
self.point_click = 1 - self.point_click
if self.point_click == 1:
self.shadow_text(
frame_visu,
"Enter point index",
(20, 70),
0.9,
2,
(255, 255, 255),
)
self.shadow_text(
frame_visu,
"Exists: " + "".join(sorted(self.points.keys())),
(20, 105),
0.8,
1,
(255, 255, 255),
)
cv2.imshow("tsmark", frame_visu)
k2 = cv2.waitKey(0)
if k2 & 0xFF == ord("q") or k2 & 0xFF == 27:
self.point_click = 0
else:
self.point_index = chr(k2)
elif k & 0xFF == ord("g"): # Go to
self.shadow_text(
frame_visu,
"Enter frame or time",
(20, 70),
0.9,
2,
(255, 255, 255),
)
cv2.imshow("tsmark", frame_visu)
entered_chars = ""
while True:
frame_query = frame_visu.copy()
self.shadow_text(
frame_query,
entered_chars,
(20, 100),
0.9,
2,
(255, 255, 255),
)
cv2.imshow("tsmark", frame_query)
del frame_query
k2 = cv2.waitKey(0)
if k2 & 0xFF == ord("q") or k2 & 0xFF == 27:
break
elif k2 & 0xFF == ord("g") or k2 & 0xFF == 13:
try:
self.nr = int(entered_chars) - 1
except ValueError:
try:
self.nr = self.parse_time(entered_chars)
except Exception:
self.add_message("Cannot parse time")
break
self.read_next = True
break
elif k2 & 0xFF == 8: # backspace
entered_chars = entered_chars[0:-1]
elif k2 & 0xFF in digits_ords:
entered_chars += str(digits_ords.index(k2 & 0xFF))
elif k2 & 0xFF == ord(":"):
entered_chars += ":"
elif k2 & 0xFF == ord("."):
entered_chars += "."
else:
pass
elif k & 0xFF == ord("m"): # launch plugin module
self.launch_plugin()
elif k & 0xFF == ord("t"): # tracking
self.track_point()
elif k & 0xFF == ord("e"): # point edit (width height)
self.modify_point_wh()
elif k & 0xFF == ord("u"): # toggle interpolation
self.toggle_interpolation()
elif k & 0xFF == ord("x"): # toggle ts
if self.point_click == 1:
self.toggle_point(self.nr)
else:
self.toggle_stamp()
elif k & 0xFF == ord("r"): # convert interpolated points
self.convert_interpolated_points()
elif k & 0xFF == ord("v"):
self.show_info = not self.show_info
elif k & 0xFF == ord("h"):
self.print_help()
self.show_help = not self.show_help
if (not self.paused) or self.read_next:
self.nr += 1
if self.nr < 0:
self.nr = 0
if self.nr >= self.frames:
self.nr = self.frames - 1
self.paused = True
if self.read_next:
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
time_to_wait = FPS_modifiers[FPS_modifier] * self.viewer_spf - time.time() + show_time
if time_to_wait > 0:
time.sleep(time_to_wait)
else:
self.nr = self.frames - 2 - read_fails
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
read_fails += 1
if read_fails > self.frames:
self.nr = 0
self.open()
self.paused = True
self.read_next = True
if time.time() > self.autosave_timer + self.autosave_interval:
self.autosave_timer = time.time()
try:
self.save_timestamps()
except Exception as e:
print(e)
self.video_reader.release()
cv2.destroyAllWindows()
self.print_timestamps()
self.save_timestamps()
class TrackerGUI:
def __init__(self, marker):
self.marker = marker
self.points = {}
try:
cv2.TrackerKCF_create()
except AttributeError:
marker.add_message("Tracking failed: missing opencv contrib")
return
self.start()
def start(self):
old_nr = self.marker.nr
curr_point = self.marker.get_point()
if curr_point["x0"] is None:
self.marker.add_message("Not in point frame (green)")
return
max_frames = int(
min(self.marker.point_tracking_length * self.marker.fps, self.marker.frames - self.marker.nr - 1)
)
cv2.namedWindow("tsmark - tracker", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
tracker = cv2.TrackerKCF_create()
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr)
# TODO: track using original video resolution!
ok, frame = self.marker.video_reader.read()
frame = cv2.resize(frame.copy(), self.marker.video_res)
bbox = tuple([curr_point["x0"], curr_point["y0"], curr_point["w"], curr_point["h"]])
ok = tracker.init(frame, bbox)
visu_interval = 0.2
show_time = 0
show_message = ""
tracked = {}
tracked[0] = [*bbox, 1]
for i in range(max_frames):
# Read a new frame
ok, frame = self.marker.video_reader.read()
frame = cv2.resize(frame.copy(), self.marker.video_res)
if not ok:
break
ok, bbox = tracker.update(frame)
if ok:
# Tracking success
if self.marker.nr + i + 1 in self.marker.points[self.marker.point_index]:
point = self.marker.get_point(nr=self.marker.nr + i + 1)
bbox = tuple([point["x0"], point["y0"], point["w"], point["h"]])
tracked[i + 1] = [*bbox, 1]
show_message = f"Tracking... ({i}/{max_frames})"
else:
# Tracking failure
show_message = f"Tracking failure detected ({i}/{max_frames})"
bbox = None
if time.time() > show_time + visu_interval:
# Display result
if bbox is not None:
p1 = (int(bbox[0]), int(bbox[1]))
p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
cv2.rectangle(frame, p1, p2, (255, 0, 0), 2, 1)
self.marker.shadow_text(frame, show_message, (100, 80), 0.75, 2, (255, 255, 255))
cv2.imshow("tsmark - tracker", frame)
show_time = time.time()
k = cv2.waitKey(1)
# break tracking if ESC pressed, q, space or enter
if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13:
break
done = False
paused = False
seek = False
cut_after = max_frames
while True:
if done:
break
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr)
i = -1
while True:
show_time = time.time()
if done:
break
if paused:
frame = frame_copy.copy()
if (not paused) or seek:
ok, frame = self.marker.video_reader.read()
frame = cv2.resize(frame.copy(), self.marker.video_res)
frame_copy = frame.copy()
i += 1
seek = False
self.marker.shadow_text(frame, f"Accept? ({i+1}/{max_frames})", (100, 80), 0.75, 2, (255, 255, 255))
if i in tracked:
bbox = tracked[i]
p1 = (int(bbox[0]), int(bbox[1]))
p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
color = (0, 255, 0) if cut_after > i else (0, 192, 192)
thicc = 2 if cut_after > i else 1
cv2.rectangle(frame, p1, p2, color, thicc, 1)
cv2.imshow("tsmark - tracker", frame)
# speed up fps by 2
time_to_wait = self.marker.viewer_spf / 2 - time.time() + show_time
k = cv2.waitKey(max(1, int(time_to_wait * 1000)))
if k & 0xFF == ord("q") or k & 0xFF == 13: # accept with q or enter
done = True
break
if k & 0xFF == 27: # decline with escape
done = True
cut_after = 0
break
elif k & 0xFF == 32: # space
paused = not paused
# Movement =================
elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow
i += int(self.marker.fps) - 1
seek = True
elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow
i -= int(self.marker.fps) + 1
seek = True
# Move by frame
elif k & 0xFF == ord(".") or k & 0xFF == ord("c"):
paused = True
seek = True
elif k & 0xFF == ord(",") or k & 0xFF == ord("z"):
paused = True
i -= 2
seek = True
elif k & 0xFF == ord("x"):
cut_after = i
# TODO: ord("h") for help!
if i >= max_frames - 1:
i = max_frames - 2
paused = True
seek = True
if i < 0:
i = -1
paused = True
seek = True
if seek:
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr + i + 1)
cv2.destroyWindow("tsmark - tracker")
self.marker.nr = old_nr - 1
self.marker.read_next = True
self.points = {}
for i in sorted(list(tracked.keys())):
if i >= cut_after:
continue
self.points[self.marker.nr + i + 1] = {
"x0": tracked[i][0],
"y0": tracked[i][1],
"x1": tracked[i][0] + tracked[i][2],
"y1": tracked[i][1] + tracked[i][3],
"visible": POINT_VISIBILITY[0],
}