1609 lines
60 KiB
Python
Executable File
1609 lines
60 KiB
Python
Executable File
import importlib.util
|
|
import json
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from scipy.interpolate import PchipInterpolator
|
|
|
|
PLUGIN_FOLDER = os.path.expanduser("~/.config/tsmark/plugins")
|
|
|
|
COLOR_PREPOST = (0, 128, 128)
|
|
COLOR_KEY = (60, 205, 60)
|
|
COLOR_KEY_OCCLUDED = (50, 128, 50)
|
|
COLOR_INTERP = (192, 0, 192)
|
|
COLOR_INTERP_OCCLUDED = (128, 0, 128)
|
|
COLOR_HIDDEN = (60, 60, 60)
|
|
COLOR_NONE = (255, 255, 255)
|
|
POINT_VISIBILITY = ("yes", "occluded", "hidden")
|
|
|
|
|
|
class Marker:
|
|
def __init__(self, opts):
|
|
self.opts = opts
|
|
if not os.path.exists(self.opts.video):
|
|
raise FileNotFoundError("Video file missing!")
|
|
|
|
self.paused = False
|
|
self.read_next = False
|
|
self.show_info = True
|
|
self.show_help = False
|
|
self.auto_step = True
|
|
self.font = cv2.FONT_HERSHEY_SIMPLEX
|
|
self.frame_visu = []
|
|
self.frame_raw = []
|
|
self.max_res = tuple([int(x) for x in self.opts.max_res.split("x")])
|
|
self.min_res = (512, None)
|
|
self.mouse_position = (0, 0)
|
|
self.crop = [(None, None), (None, None), None]
|
|
self.crop_click = 0
|
|
self.point_click = 0
|
|
self.point_tracking = 0
|
|
self.point_tracking_length = float(self.opts.max_track)
|
|
self.points = {}
|
|
self.points_interpolated = {}
|
|
self.point_index = None
|
|
self.points_interpolation_enabled = True
|
|
self.points_interpolation_required = {}
|
|
self.points_interpolation_thread = None
|
|
self.points_interpolation_thread_exit = False
|
|
|
|
self.message = None
|
|
self.message_timer = time.time()
|
|
|
|
self.autosave_interval = 60
|
|
self.autosave_timer = time.time()
|
|
|
|
self.forced_fps = opts.fps
|
|
|
|
try:
|
|
self.open()
|
|
self.calculate_res()
|
|
self.parse_timestamps()
|
|
self.load_plugin()
|
|
if self.opts.start_time:
|
|
try:
|
|
self.nr = int(self.opts.start_time)
|
|
except ValueError:
|
|
self.nr = self.parse_time(self.opts.start_time)
|
|
self.loop()
|
|
except Exception as e:
|
|
exc_type, exc_obj, exc_tb = sys.exc_info()
|
|
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
|
|
print(exc_type, fname, exc_tb.tb_lineno)
|
|
raise e
|
|
|
|
def open(self):
|
|
|
|
self.video_reader = cv2.VideoCapture(self.opts.video)
|
|
self.frames = int(self.video_reader.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
self.fps = self.video_reader.get(cv2.CAP_PROP_FPS)
|
|
self.spf = 1 / self.fps
|
|
self.viewer_fps = self.forced_fps if self.forced_fps else self.fps
|
|
self.viewer_spf = 1 / self.viewer_fps
|
|
self.video_length = self.frames * self.fps
|
|
|
|
def calculate_res(self):
|
|
|
|
self.video_res = [
|
|
int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)),
|
|
int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)),
|
|
]
|
|
self.video_res_original = [
|
|
int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)),
|
|
int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)),
|
|
]
|
|
video_aspect = self.video_res[0] / self.video_res[1]
|
|
if self.video_res[0] > self.max_res[0]:
|
|
self.video_res[0] = int(self.max_res[0])
|
|
self.video_res[1] = int(self.video_res[0] / video_aspect)
|
|
|
|
if self.video_res[1] > self.max_res[1]:
|
|
self.video_res[1] = int(self.max_res[1])
|
|
self.video_res[0] = int(self.video_res[1] * video_aspect)
|
|
|
|
if self.video_res[0] < self.min_res[0]:
|
|
self.video_res[0] = int(self.min_res[0])
|
|
self.video_res[1] = int(self.video_res[0] / video_aspect)
|
|
|
|
self.video_res = tuple(self.video_res)
|
|
self.crop = [(0, 0), tuple(self.video_res), None]
|
|
if self.opts.crop:
|
|
w, h, x, y = [int(c) for c in self.opts.crop.split(":")]
|
|
self.crop = [
|
|
self.original_to_visual((x, y)),
|
|
self.original_to_visual((w, h)),
|
|
True,
|
|
]
|
|
self.bar_start = int(self.video_res[0] * 0.05)
|
|
self.bar_end = int(self.video_res[0] * 0.95)
|
|
self.bar_top = int(self.video_res[1] * 0.90)
|
|
self.bar_bottom = int(self.video_res[1] * 0.95)
|
|
|
|
def calculate_step(self):
|
|
|
|
now = time.time()
|
|
self.last_move = [x for x in self.last_move if x[1] > now - 3]
|
|
if len(self.last_move) == 0:
|
|
self.step = 1
|
|
self.last_move = []
|
|
return
|
|
lefts = sum([1 for x in self.last_move if x[0] == "l"])
|
|
rights = sum([1 for x in self.last_move if x[0] == "r"])
|
|
if lefts > 0 and rights > 0:
|
|
self.step = 1
|
|
self.last_move = []
|
|
return
|
|
count = max(lefts, rights)
|
|
if count < 5:
|
|
self.step = 1
|
|
else:
|
|
# x2 poly from 5:5 -> 15:180
|
|
self.step = 45 - 16.5 * count + 1.7 * count * count
|
|
self.step = min(self.step, 0.1 * self.video_length)
|
|
self.step = int(self.step)
|
|
|
|
def draw_bar(self, frame):
|
|
|
|
position = self.nr / self.frames
|
|
bar_position = int(self.bar_start + position * (self.bar_end - self.bar_start))
|
|
|
|
cv2.rectangle(
|
|
frame,
|
|
(self.bar_start, self.bar_top),
|
|
(self.bar_end, self.bar_bottom),
|
|
(255, 255, 255),
|
|
2,
|
|
)
|
|
|
|
for ts in self.stamps:
|
|
ts_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
|
|
cv2.line(
|
|
frame,
|
|
(ts_pos, self.bar_top),
|
|
(ts_pos, self.bar_bottom),
|
|
(32, 32, 32),
|
|
3,
|
|
)
|
|
cv2.line(
|
|
frame,
|
|
(ts_pos, self.bar_top),
|
|
(ts_pos, self.bar_bottom),
|
|
(84, 255, 63),
|
|
1,
|
|
)
|
|
|
|
if self.point_click == 1 and self.point_index in self.points:
|
|
bar_middle = int((self.bar_top + self.bar_bottom) / 2)
|
|
for ts in self.points[self.point_index]:
|
|
p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
|
|
cv2.circle(frame, (p_pos, bar_middle), 3, (32, 32, 32), -1)
|
|
for ts in self.points[self.point_index]:
|
|
p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start))
|
|
color = self.get_point_color(self.points[self.point_index][ts])
|
|
cv2.circle(frame, (p_pos, bar_middle), 1, color, -1)
|
|
|
|
cv2.line(
|
|
frame,
|
|
(bar_position, self.bar_top),
|
|
(bar_position, self.bar_bottom),
|
|
(63, 84, 255),
|
|
1,
|
|
)
|
|
|
|
self.shadow_text(
|
|
frame,
|
|
"1",
|
|
(self.bar_start - 7, self.bar_bottom + 20),
|
|
0.7,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
end_frame = self.format_time(self.frames - 1)
|
|
(text_width, text_height) = cv2.getTextSize(end_frame, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0]
|
|
self.shadow_text(
|
|
frame,
|
|
end_frame,
|
|
(self.bar_end - text_width, self.bar_bottom + 20),
|
|
0.7,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
|
|
def draw_crop(self, frame):
|
|
|
|
if self.crop[2] is None:
|
|
return
|
|
p2 = (self.crop[0][0] + self.crop[1][0], self.crop[0][1] + self.crop[1][1])
|
|
cv2.rectangle(
|
|
frame,
|
|
self.crop[0],
|
|
p2,
|
|
(0, 192, 192),
|
|
1,
|
|
)
|
|
if self.crop_click == 1:
|
|
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
|
|
|
|
self.shadow_text(
|
|
frame,
|
|
f"{x},{y}",
|
|
self.crop[0],
|
|
0.5,
|
|
1,
|
|
(0, 192, 192),
|
|
)
|
|
if self.crop_click == 2:
|
|
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
|
|
w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1]))
|
|
self.shadow_text(
|
|
frame,
|
|
f"{w}x{h}",
|
|
self.crop[0],
|
|
0.5,
|
|
1,
|
|
(0, 192, 192),
|
|
)
|
|
|
|
def draw_points(self, frame):
|
|
|
|
if self.opts.output_points is None:
|
|
return
|
|
|
|
for index in self.points:
|
|
if index == self.point_index and self.point_click == 1:
|
|
continue
|
|
current = self.get_interpolated_point(index=index)
|
|
if current["type"] in ("pre", "post", None):
|
|
continue
|
|
if current["visible"] == "hidden":
|
|
continue
|
|
color = self.get_point_color(current)
|
|
cv2.circle(frame, (current["cx"], current["cy"]), 10, (0, 0, 0), 2)
|
|
cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1)
|
|
self.shadow_text(
|
|
frame,
|
|
index,
|
|
(current["cx"], current["cy"]),
|
|
0.5,
|
|
1,
|
|
color,
|
|
)
|
|
|
|
if self.point_click == 1:
|
|
# Draw crosshair
|
|
cv2.line(
|
|
frame, (self.mouse_position[0], 0), (self.mouse_position[0], self.video_res[1]), (128, 128, 128), 1
|
|
)
|
|
cv2.line(
|
|
frame, (0, self.mouse_position[1]), (self.video_res[0], self.mouse_position[1]), (128, 128, 128), 1
|
|
)
|
|
# Show current track
|
|
x, y = [20, 70]
|
|
intrp_str = (
|
|
""
|
|
if not self.points_interpolation_enabled
|
|
else " i*" if True in self.points_interpolation_required.values() else " i"
|
|
)
|
|
|
|
self.shadow_text(
|
|
frame,
|
|
f"P:{str(self.point_index)}{intrp_str}",
|
|
(x, y),
|
|
0.5,
|
|
1,
|
|
(255, 255, 255),
|
|
)
|
|
|
|
try:
|
|
current = self.get_interpolated_point()
|
|
if current["type"] is not None:
|
|
color = self.get_point_color(current)
|
|
|
|
cv2.rectangle(
|
|
frame,
|
|
(current["x0"], current["y0"]),
|
|
(current["x1"], current["y1"]),
|
|
color,
|
|
2,
|
|
)
|
|
|
|
cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1)
|
|
|
|
if self.points_interpolation_enabled:
|
|
history = []
|
|
for p in range(max(1, int(self.nr - self.viewer_fps)), self.nr + 1):
|
|
po = self.get_interpolated_point(p)
|
|
history.append([po["cx"], po["cy"]])
|
|
history = np.array(history, np.int32).reshape((-1, 1, 2))
|
|
cv2.polylines(frame, [history], False, COLOR_INTERP, 1)
|
|
|
|
except (KeyError, IndexError, TypeError):
|
|
print(self.get_interpolated_point(), self.nr)
|
|
pass
|
|
try:
|
|
current = self.get_point()
|
|
if current["x0"] is not None:
|
|
cv2.circle(frame, (current["cx"], current["cy"]), 13, COLOR_KEY, 2)
|
|
except KeyError:
|
|
pass
|
|
except IndexError:
|
|
print(self.points[self.point_index])
|
|
print(self.nr)
|
|
pass
|
|
|
|
def scan_point(self, direction):
|
|
def set_nr(ts):
|
|
self.nr = ts - 1
|
|
self.read_next = True
|
|
|
|
try:
|
|
if direction == "first":
|
|
return set_nr(min(list(self.points[self.point_index].keys())))
|
|
|
|
if direction == "last":
|
|
return set_nr(max(list(self.points[self.point_index].keys())))
|
|
|
|
if direction == "next":
|
|
for ts in sorted(list(self.points[self.point_index].keys())):
|
|
if ts > self.nr:
|
|
return set_nr(ts)
|
|
|
|
if direction == "previous":
|
|
for ts in reversed(sorted(list(self.points[self.point_index].keys()))):
|
|
if ts < self.nr:
|
|
return set_nr(ts)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
def toggle_point(self, ts):
|
|
try:
|
|
if ts in self.points[self.point_index]:
|
|
# Remove point
|
|
del self.points[self.point_index][ts]
|
|
else:
|
|
# Introduce point from interpolated
|
|
ip = self.get_interpolated_point()
|
|
if ip["type"] is None:
|
|
return
|
|
self.points[self.point_index][self.nr] = {
|
|
"x0": ip["x0"],
|
|
"y0": ip["y0"],
|
|
"x1": ip["x1"],
|
|
"y1": ip["y1"],
|
|
"visible": ip["visible"],
|
|
}
|
|
self.points_interpolation_required[self.point_index] = True
|
|
except Exception:
|
|
pass
|
|
|
|
def get_point(self, nr=None, index=None):
|
|
"""{x0,y0,x1,y1, cx, cy, w, h, visible}"""
|
|
if nr is None:
|
|
nr = self.nr
|
|
if index is None:
|
|
index = self.point_index
|
|
if index in self.points:
|
|
|
|
if nr in self.points[index]:
|
|
value = self.points[index][nr].copy()
|
|
value.update(
|
|
{
|
|
"cx": int((value["x0"] + value["x1"]) / 2),
|
|
"cy": int((value["y0"] + value["y1"]) / 2),
|
|
"w": int(abs(value["x0"] - value["x1"])),
|
|
"h": int(abs(value["y0"] - value["y1"])),
|
|
}
|
|
)
|
|
return value
|
|
|
|
return {
|
|
"x0": None,
|
|
"y0": None,
|
|
"x1": None,
|
|
"y1": None,
|
|
"cx": None,
|
|
"cy": None,
|
|
"w": None,
|
|
"h": None,
|
|
"visible": None,
|
|
}
|
|
|
|
def get_interpolated_point(self, nr=None, index=None):
|
|
"""{x0,y0,x1,y1, cx, cy, visible, type, age}"""
|
|
if nr is None:
|
|
nr = self.nr
|
|
if index is None:
|
|
index = self.point_index
|
|
|
|
if index in self.points:
|
|
if nr in self.points[index]:
|
|
value = self.get_point(nr=nr, index=index)
|
|
value.update({"type": "key" if value["x0"] is not None else None, "age": 0})
|
|
return value
|
|
|
|
if not self.points_interpolation_enabled:
|
|
value = self.get_point(nr=nr, index=index)
|
|
value.update({"type": "key" if value["x0"] is not None else None, "age": 0})
|
|
return value
|
|
|
|
if index in self.points_interpolated:
|
|
if nr in self.points_interpolated[index]:
|
|
value = self.points_interpolated[index][nr].copy()
|
|
value.update(
|
|
{
|
|
"cx": int((value["x0"] + value["x1"]) / 2),
|
|
"cy": int((value["y0"] + value["y1"]) / 2),
|
|
}
|
|
)
|
|
return value
|
|
|
|
return {
|
|
"x0": None,
|
|
"y0": None,
|
|
"x1": None,
|
|
"y1": None,
|
|
"cx": None,
|
|
"cy": None,
|
|
"visible": None,
|
|
"type": None,
|
|
"age": None,
|
|
}
|
|
|
|
def convert_interpolated_points(self):
|
|
|
|
if self.point_click == 1 and self.point_index in self.points:
|
|
self.toggle_interpolation(True)
|
|
|
|
for nr in range(self.frames):
|
|
ip = self.get_interpolated_point(nr=nr)
|
|
if ip["type"] == "interp" and ip["visible"] == POINT_VISIBILITY[0]:
|
|
self.points[self.point_index][nr] = {
|
|
"x0": ip["x0"],
|
|
"y0": ip["y0"],
|
|
"x1": ip["x1"],
|
|
"y1": ip["y1"],
|
|
"visible": POINT_VISIBILITY[0],
|
|
}
|
|
|
|
def modify_point(self, position, x, y):
|
|
"""position: tl topleft, br bottomright, c center"""
|
|
if not self.point_index in self.points:
|
|
self.points[self.point_index] = {}
|
|
|
|
if not self.nr in self.points[self.point_index]:
|
|
if len(self.points[self.point_index]) > 0:
|
|
keys = sorted(list(self.points[self.point_index].keys()))
|
|
if self.nr > keys[-1]: # last point if at end of track
|
|
last_p = self.points[self.point_index][keys[-1]]
|
|
elif self.nr < keys[0]: # first point if before track
|
|
last_p = self.points[self.point_index][keys[0]]
|
|
else: # previous point if in the middle of track
|
|
prev_key = keys[0]
|
|
for key in keys:
|
|
if key > self.nr:
|
|
last_p = self.points[self.point_index][prev_key]
|
|
break
|
|
prev_key = key
|
|
w = abs(last_p["x1"] - last_p["x0"])
|
|
h = abs(last_p["y1"] - last_p["y0"])
|
|
else:
|
|
w = 50
|
|
h = 50
|
|
|
|
if position == "tl":
|
|
self.points[self.point_index][self.nr] = {
|
|
"x0": x,
|
|
"y0": y,
|
|
"x1": min(self.video_res[0] - 1, x + w),
|
|
"y1": min(self.video_res[1] - 1, y + h),
|
|
"visible": last_p["visible"],
|
|
}
|
|
if position == "br":
|
|
self.points[self.point_index][self.nr] = {
|
|
"x0": max(0, x - w),
|
|
"y0": max(0, y - h),
|
|
"x1": x,
|
|
"y1": y,
|
|
"visible": last_p["visible"],
|
|
}
|
|
if position == "c":
|
|
self.points[self.point_index][self.nr] = {
|
|
"x0": max(0, int(x - w / 2)),
|
|
"y0": max(0, int(y - h / 2)),
|
|
"x1": min(self.video_res[0] - 1, int(x + w / 2)),
|
|
"y1": min(self.video_res[1] - 1, int(y + h / 2)),
|
|
"visible": last_p["visible"],
|
|
}
|
|
|
|
else:
|
|
# not a new point
|
|
if position == "c":
|
|
current = self.points[self.point_index][self.nr]
|
|
w = abs(current["x1"] - current["x0"])
|
|
h = abs(current["y1"] - current["y0"])
|
|
self.points[self.point_index][self.nr] = {
|
|
"x0": max(0, int(x - w / 2)),
|
|
"y0": max(0, int(y - h / 2)),
|
|
"x1": min(self.video_res[0] - 1, int(x + w / 2)),
|
|
"y1": min(self.video_res[1] - 1, int(y + h / 2)),
|
|
"visible": current["visible"],
|
|
}
|
|
elif position == "tl":
|
|
self.points[self.point_index][self.nr]["x0"] = x
|
|
self.points[self.point_index][self.nr]["y0"] = y
|
|
|
|
elif position == "br":
|
|
self.points[self.point_index][self.nr]["x1"] = x
|
|
self.points[self.point_index][self.nr]["y1"] = y
|
|
|
|
if self.points[self.point_index][self.nr]["x0"] > self.points[self.point_index][self.nr]["x1"]:
|
|
self.points[self.point_index][self.nr]["x1"], self.points[self.point_index][self.nr]["x0"] = (
|
|
self.points[self.point_index][self.nr]["x0"],
|
|
self.points[self.point_index][self.nr]["x1"],
|
|
)
|
|
if self.points[self.point_index][self.nr]["y0"] > self.points[self.point_index][self.nr]["y1"]:
|
|
self.points[self.point_index][self.nr]["y1"], self.points[self.point_index][self.nr]["y0"] = (
|
|
self.points[self.point_index][self.nr]["y0"],
|
|
self.points[self.point_index][self.nr]["y1"],
|
|
)
|
|
|
|
# self.interpolate_points()
|
|
self.points_interpolation_required[self.point_index] = True
|
|
|
|
def modify_point_wh(self):
|
|
|
|
if self.point_click == 0:
|
|
self.add_message("Not in point clicking mode")
|
|
return
|
|
if self.opts.output_points is None:
|
|
return
|
|
curr_point = self.get_point()
|
|
if curr_point["x0"] is None:
|
|
self.add_message("Not in point frame (green)")
|
|
return
|
|
|
|
new_wh = abs(self.mouse_position[0] - curr_point["cx"])
|
|
new_hh = abs(self.mouse_position[1] - curr_point["cy"])
|
|
self.points[self.point_index][self.nr]["x0"] = int(curr_point["cx"] - new_wh)
|
|
self.points[self.point_index][self.nr]["y0"] = int(curr_point["cy"] - new_hh)
|
|
self.points[self.point_index][self.nr]["x1"] = int(curr_point["cx"] + new_wh)
|
|
self.points[self.point_index][self.nr]["y1"] = int(curr_point["cy"] + new_hh)
|
|
self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[0]
|
|
|
|
self.points_interpolation_required[self.point_index] = True
|
|
|
|
def toggle_point_visibility(self):
|
|
|
|
if self.point_click == 0:
|
|
self.add_message("Not in point clicking mode")
|
|
return
|
|
if self.opts.output_points is None:
|
|
return
|
|
curr_point = self.get_point()
|
|
if curr_point["x0"] is None:
|
|
self.add_message("Not in point frame (green)")
|
|
return
|
|
|
|
try:
|
|
new_index = (1 + POINT_VISIBILITY.index(self.points[self.point_index][self.nr]["visible"])) % len(
|
|
POINT_VISIBILITY
|
|
)
|
|
except (ValueError, KeyError):
|
|
new_index = 0
|
|
|
|
self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[new_index]
|
|
try:
|
|
self.points_interpolated[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[new_index]
|
|
except Exception as e:
|
|
print(e)
|
|
pass
|
|
self.points_interpolation_required[self.point_index] = True
|
|
|
|
def track_point(self):
|
|
|
|
if self.point_click == 0:
|
|
self.add_message("Not in point clicking mode")
|
|
return
|
|
if self.opts.output_points is None:
|
|
return
|
|
|
|
self.toggle_interpolation(True)
|
|
|
|
tracker_gui = TrackerGUI(self)
|
|
if len(tracker_gui.points) > 0:
|
|
for nr in tracker_gui.points:
|
|
self.points[self.point_index][nr] = tracker_gui.points[nr]
|
|
# self.interpolate_points()
|
|
self.points_interpolation_required[self.point_index] = True
|
|
self.nr = max(tracker_gui.points) - 1
|
|
self.read_next = True
|
|
|
|
def load_plugin(self):
|
|
|
|
self.plugin = None
|
|
if self.opts.plugin:
|
|
if not os.path.exists(os.path.join(PLUGIN_FOLDER, "hello.py")):
|
|
os.makedirs(PLUGIN_FOLDER, exist_ok=True)
|
|
with open(os.path.join(PLUGIN_FOLDER, "hello.py"), "wt") as fp:
|
|
fp.write(
|
|
"""import cv2
|
|
import numpy as np
|
|
class World:
|
|
def __init__(self, tsmark):
|
|
self.tsmark = tsmark
|
|
self.window_name = "tsmark - plugin"
|
|
print("plugin loaded")
|
|
def __call__(self):
|
|
print("plugin called")
|
|
self.tsmark.paused = True
|
|
cv2.namedWindow(self.window_name, flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
|
|
frame = cv2.resize(np.zeros((16, 16, 3), dtype=np.uint8), self.tsmark.video_res)
|
|
self.tsmark.shadow_text(frame, "Hello World! press q to exit.", (100, 80), 0.75, 2, (255, 255, 255))
|
|
cv2.imshow(self.window_name, frame)
|
|
while True:
|
|
k = cv2.waitKey(10)
|
|
# break if ESC pressed, q, space or enter
|
|
if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13:
|
|
break
|
|
cv2.destroyWindow(self.window_name)
|
|
return
|
|
"""
|
|
)
|
|
|
|
plugin_file, plugin_class = self.opts.plugin.split(":", 1)
|
|
plugin_path = os.path.join(PLUGIN_FOLDER, plugin_file + ".py")
|
|
module_spec = importlib.util.spec_from_file_location("Plugin", plugin_path)
|
|
loaded_plugin = importlib.util.module_from_spec(module_spec)
|
|
module_spec.loader.exec_module(loaded_plugin)
|
|
plugin_class = getattr(loaded_plugin, plugin_class)
|
|
self.plugin = plugin_class(self)
|
|
|
|
def launch_plugin(self):
|
|
|
|
if self.plugin:
|
|
self.plugin()
|
|
|
|
def interpolate_points(self, point_index=None):
|
|
"""types:
|
|
key: user clicked / accepted frame
|
|
interp: interpolated frame
|
|
pre: before any keyframes
|
|
post: after any keyframes
|
|
"""
|
|
|
|
if self.points_interpolation_thread is None:
|
|
self.points_interpolation_thread = threading.Thread(target=self.interpolate_points_in_thread, args=())
|
|
self.points_interpolation_thread.start()
|
|
|
|
if not self.points_interpolation_thread.is_alive():
|
|
self.points_interpolation_thread = threading.Thread(target=self.interpolate_points_in_thread, args=())
|
|
self.points_interpolation_thread.start()
|
|
|
|
if point_index is None:
|
|
point_index = self.point_index
|
|
|
|
def i_point(x0=None, y0=None, x1=None, y1=None, t=None, visible=None, age=None):
|
|
return {"x0": x0, "y0": y0, "x1": x1, "y1": y1, "type": t, "visible": visible, "age": age}
|
|
|
|
def point2array(p):
|
|
return [p["x0"], p["y0"], p["x1"], p["y1"]]
|
|
|
|
if not point_index in self.points:
|
|
return
|
|
|
|
self.points_interpolation_required[point_index] = False
|
|
if not point_index in self.points_interpolated:
|
|
self.points_interpolated[point_index] = {key: {} for key in range(self.frames)}
|
|
|
|
new_points = {k: v for k, v in self.points_interpolated[point_index].items()}
|
|
|
|
if len(self.points[point_index]) == 1: # only one point added
|
|
key = list(self.points[point_index].keys())[0]
|
|
vals = self.points[point_index][key]
|
|
for key in range(self.frames):
|
|
new_points[key] = i_point()
|
|
new_points[key].update(vals)
|
|
new_points[key]["type"] = "pre" if key < self.nr else "post"
|
|
|
|
new_points[self.nr]["type"] = "key"
|
|
self.points_interpolated[point_index] = new_points
|
|
|
|
else: # more points
|
|
point_keys = list(sorted(list(self.points[point_index].keys())))
|
|
point_values = [point2array(self.points[point_index][k]) for k in point_keys]
|
|
xyxy = np.array(point_values).T
|
|
spline = PchipInterpolator(point_keys, xyxy, axis=1)
|
|
start_key = min(point_keys)
|
|
end_key = max(point_keys) + 1
|
|
t2 = np.arange(start_key, end_key)
|
|
# Pre points
|
|
for key in range(0, start_key):
|
|
new_points[key]["type"] = "pre"
|
|
new_points[key].update(self.points[point_index][start_key])
|
|
# interpolated points
|
|
visible = self.points[point_index][start_key]["visible"]
|
|
for row in np.vstack((t2, spline(t2))).T:
|
|
if row[0] in point_keys:
|
|
visible = self.points[point_index][row[0]]["visible"]
|
|
new_points[row[0]] = {
|
|
"type": "interp",
|
|
"x0": int(row[1]),
|
|
"y0": int(row[2]),
|
|
"x1": int(row[3]),
|
|
"y1": int(row[4]),
|
|
"visible": visible,
|
|
}
|
|
|
|
# post points
|
|
for key in range(end_key, self.frames + 1):
|
|
new_points[key] = {
|
|
"type": "post",
|
|
"x0": int(row[1]),
|
|
"y0": int(row[2]),
|
|
"x1": int(row[3]),
|
|
"y1": int(row[4]),
|
|
"visible": visible,
|
|
}
|
|
# clicked points (not necessary, could determine at draw time!)
|
|
for key in point_keys:
|
|
new_points[key]["type"] = "key"
|
|
age = 0
|
|
for key in new_points:
|
|
if new_points[key]["type"] == "key":
|
|
age = 0
|
|
if new_points[key]["type"] == "interp":
|
|
age += 1
|
|
new_points[key]["age"] = age
|
|
|
|
self.points_interpolated[point_index] = new_points
|
|
|
|
def interpolate_points_in_thread(self):
|
|
|
|
self.points_interpolation_frequency = 1
|
|
|
|
while True:
|
|
if self.points_interpolation_thread_exit:
|
|
return
|
|
time.sleep(self.points_interpolation_frequency)
|
|
if not self.points_interpolation_enabled:
|
|
continue
|
|
|
|
for point_index in self.points_interpolation_required:
|
|
if self.points_interpolation_required[point_index]:
|
|
self.interpolate_points(point_index)
|
|
|
|
def toggle_interpolation(self, value=None):
|
|
|
|
if value is not None:
|
|
self.points_interpolation_enabled = not value
|
|
|
|
self.points_interpolation_enabled = not self.points_interpolation_enabled
|
|
if self.points_interpolation_enabled:
|
|
self.interpolate_points()
|
|
|
|
def draw_help(self, frame):
|
|
|
|
bottom = 80
|
|
left = 100
|
|
for row in self.get_help().split("\n"):
|
|
self.shadow_text(frame, row, (left, bottom), 0.6, 1, (255, 255, 255))
|
|
bottom += 18
|
|
|
|
def draw_label(self, frame):
|
|
|
|
if not self.nr in self.stamps:
|
|
return
|
|
|
|
text = "{} #{}".format(self.nr, self.stamps.index(self.nr) + 1)
|
|
bottom = 60
|
|
left = 10
|
|
self.shadow_text(frame, text, (left, bottom), 1, 2, (63, 84, 255))
|
|
|
|
def draw_time(self, frame):
|
|
left = 10
|
|
bottom = 30
|
|
|
|
formatted = "{} {}".format(
|
|
self.format_time(self.nr),
|
|
f"|| ({self.nr})" if self.paused else "",
|
|
)
|
|
self.shadow_text(frame, formatted, (left, bottom), 1.1, 2, (255, 255, 255))
|
|
|
|
def draw_message(self, frame):
|
|
|
|
if self.message is None:
|
|
return
|
|
if time.time() - 5 > self.message_timer:
|
|
self.message = None
|
|
return
|
|
|
|
left = 10
|
|
bottom = 90
|
|
|
|
self.shadow_text(frame, self.message, (left, bottom), 0.9, 2, (255, 255, 255))
|
|
|
|
def add_message(self, new):
|
|
|
|
self.message = new
|
|
self.message_timer = time.time()
|
|
|
|
def format_time(self, nframe):
|
|
|
|
seconds = int(nframe / self.fps)
|
|
frame = nframe % self.fps
|
|
parts = int(100 * (frame / self.fps))
|
|
return time.strftime("%H:%M:%S", time.gmtime(seconds)) + ".%02d" % (parts)
|
|
|
|
def get_point_color(self, point):
|
|
|
|
t = point.get("type", "key")
|
|
v = point.get("visible", "yes")
|
|
|
|
if v == "hidden":
|
|
return COLOR_HIDDEN
|
|
if t == "key":
|
|
if v == "yes":
|
|
return COLOR_KEY
|
|
if v == "occluded":
|
|
return COLOR_KEY_OCCLUDED
|
|
if t == "interp":
|
|
if v == "yes":
|
|
return COLOR_INTERP
|
|
if v == "occluded":
|
|
return COLOR_INTERP_OCCLUDED
|
|
if t in ("post", "pre"):
|
|
return COLOR_PREPOST
|
|
|
|
return COLOR_NONE
|
|
|
|
def get_help(self):
|
|
return """Keyboard help:
|
|
|
|
Arrows, PgUp, PgDn, Home, End or click mouse in position bar
|
|
j l i k [ ]
|
|
jump in video position
|
|
0-9 move to 0%,10%,20% .. position
|
|
, and . move one frame at a time
|
|
z and c move to previous or next mark
|
|
x or double click in the video
|
|
mark frame
|
|
space or click video
|
|
pause
|
|
a and s modify crop offset or size
|
|
|
|
f toggle 0.25x 1x or 4x FPS
|
|
v toggle HUD
|
|
h toggle help
|
|
q or esc quit
|
|
|
|
Bounding box editor:
|
|
p toggle bounding box drawing. enter any key as index.
|
|
o toggle object is visible/occluded/hidden
|
|
x toggle (delete) key frame
|
|
r convert interpolated points to points (no undo!)
|
|
u toggle automatic interpolation
|
|
mouse left: set top-left corner of box
|
|
mouse middle: set center of box
|
|
mouse right: set lower right corner of box
|
|
e set width/height of box symmetric around center
|
|
z c Home End move between key-frames
|
|
t start optical flow tracker
|
|
m start plugin (if defined)
|
|
Color codes:
|
|
green |keypoint
|
|
purple |interpolated
|
|
darker tone |occluded key/interpolated
|
|
yellow |post / pre points
|
|
gray |point is hidden
|
|
|
|
"""
|
|
|
|
def mouse_click(self, event, x, y, flags, param):
|
|
|
|
in_bar = all(
|
|
(
|
|
x < self.bar_end,
|
|
x > self.bar_start,
|
|
y < self.bar_bottom,
|
|
y > self.bar_top,
|
|
)
|
|
)
|
|
self.mouse_position = (x, y)
|
|
if self.crop_click == 1:
|
|
self.crop[0] = (x, y)
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
self.crop_click = 0
|
|
return
|
|
if self.crop_click == 2:
|
|
self.crop[1] = (x - self.crop[0][0], y - self.crop[0][1])
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
self.crop_click = 0
|
|
return
|
|
|
|
if self.point_click == 1:
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
self.modify_point("tl", int(x), int(y))
|
|
if event == cv2.EVENT_RBUTTONDOWN:
|
|
self.modify_point("br", int(x), int(y))
|
|
if event == cv2.EVENT_MBUTTONDOWN:
|
|
self.modify_point("c", int(x), int(y))
|
|
return
|
|
|
|
if event == cv2.EVENT_LBUTTONDOWN:
|
|
if in_bar:
|
|
click_relative = (x - self.bar_start) / (self.bar_end - self.bar_start)
|
|
self.nr = int(click_relative * self.frames)
|
|
self.read_next = True
|
|
else:
|
|
self.paused = not self.paused
|
|
|
|
if event == cv2.EVENT_LBUTTONDBLCLK:
|
|
if not in_bar:
|
|
self.toggle_stamp()
|
|
# doubleclick (toggle?)
|
|
# ~ print("double", x, y)
|
|
|
|
def parse_time(self, timestr):
|
|
"""return frames"""
|
|
|
|
colon_count = len(timestr.split(":")) - 1
|
|
if colon_count == 0:
|
|
secs = float(timestr)
|
|
return int(secs * self.fps)
|
|
if colon_count == 1:
|
|
mins, secstr = timestr.split(":", 1)
|
|
sec = float(secstr)
|
|
return int(self.fps * (int(mins) * 60 + sec))
|
|
if colon_count == 2:
|
|
hours, mins, secstr = timestr.split(":", 2)
|
|
sec = float(secstr)
|
|
return int(self.fps * (int(hours, 10) * 3600 + int(mins, 10) * 60 + sec))
|
|
raise ValueError("Cannot parse time definition {}".format(timestr))
|
|
raise TypeError("Cannot parse time definition {}".format(timestr))
|
|
|
|
def parse_timestamps(self):
|
|
self.stamps = []
|
|
if self.opts.timestamps:
|
|
if os.path.exists(self.opts.timestamps):
|
|
with open(self.opts.timestamps, "rt") as fp:
|
|
for row in fp.readlines():
|
|
# if row has 3 cols, pick the frame number directly
|
|
splitted = row.split(",")
|
|
if len(splitted) == 3:
|
|
self.stamps.append(int(splitted[2], 10))
|
|
if len(splitted) < 3:
|
|
self.opts.timestamps.append(splitted[0])
|
|
else:
|
|
self.opts.timestamps = self.opts.timestamps.split(",")
|
|
|
|
if len(self.stamps) > 0:
|
|
self.stamps.sort()
|
|
else:
|
|
self.stamps = sorted([self.parse_time(ts.strip()) for ts in self.opts.timestamps if ts.strip() != ""])
|
|
self.stamps = [x for x in self.stamps if 0 <= x < self.frames]
|
|
self.nr = self.stamps[0]
|
|
else:
|
|
self.stamps = []
|
|
self.nr = 0
|
|
# Read bounding boxes from JSON
|
|
if self.opts.input_points:
|
|
if os.path.exists(self.opts.input_points):
|
|
with open(self.opts.input_points, "rt") as fp:
|
|
self.points = json.load(fp)
|
|
keys = list(self.points.keys())
|
|
for index in keys:
|
|
# Remove empty dicts
|
|
if len(self.points[index]) == 0:
|
|
del self.points[index]
|
|
continue
|
|
self.point_index = index
|
|
self.points[index] = {int(k): v for k, v in self.points[index].items()}
|
|
for key in self.points[index]:
|
|
self.points[index][key]["x0"], self.points[index][key]["y0"] = self.original_to_visual(
|
|
(self.points[index][key]["x0"], self.points[index][key]["y0"])
|
|
)
|
|
self.points[index][key]["x1"], self.points[index][key]["y1"] = self.original_to_visual(
|
|
(self.points[index][key]["x1"], self.points[index][key]["y1"])
|
|
)
|
|
if not self.points[index][key].get("visible", "NA") in POINT_VISIBILITY:
|
|
self.points[index][key]["visible"] = POINT_VISIBILITY[0]
|
|
self.interpolate_points()
|
|
print(f"Loaded points with index: {index}")
|
|
self.point_index = None
|
|
|
|
def print_help(self):
|
|
print(self.get_help())
|
|
|
|
def print_timestamps(self):
|
|
|
|
if self.crop[2] is None:
|
|
cropstr = []
|
|
else:
|
|
self.opts.ffmpeg_copy = False
|
|
x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1]))
|
|
w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1]))
|
|
|
|
if w < 0:
|
|
x = x + w
|
|
w = -w
|
|
if h < 0:
|
|
y = y + h
|
|
h = -h
|
|
cropstr = ["-vf", f"crop={w}:{h}:{x}:{y}"]
|
|
self.stamps.sort()
|
|
print("# Timestamps:")
|
|
for i, ts in enumerate(self.stamps):
|
|
print("# {}: {} / {}".format(i + 1, self.format_time(ts), ts))
|
|
if len(self.stamps) == 0:
|
|
self.stamps.append(0)
|
|
self.stamps.append(self.frames)
|
|
padlen = len(str(self.frames))
|
|
src_name_print = self.opts.video.replace('"', '\\"')
|
|
tgt_name_print = os.path.splitext(self.opts.video)[0].replace('"', '\\"')
|
|
|
|
for i in range(1, len(self.stamps), 2):
|
|
from_ts = self.stamps[i - 1]
|
|
to_ts = self.stamps[i]
|
|
from_ft = self.format_time(from_ts)
|
|
to_ft = self.format_time(to_ts)
|
|
from_str = str(from_ts).zfill(padlen)
|
|
to_str = str(to_ts).zfill(padlen)
|
|
|
|
ffmpeg_args_print = []
|
|
ffmpeg_args = []
|
|
|
|
for arg in shlex.split(self.opts.ffmpeg_args):
|
|
if arg == "{crop}":
|
|
ffmpeg_args_print.extend(cropstr)
|
|
ffmpeg_args.extend(cropstr)
|
|
else:
|
|
ffmpeg_args_print.append(
|
|
arg.format(
|
|
input=shlex.quote(src_name_print),
|
|
output=shlex.quote(f"{tgt_name_print}.trim.{from_str}-{to_str}"),
|
|
start_time=from_ft,
|
|
end_time=to_ft,
|
|
)
|
|
)
|
|
ffmpeg_args.append(
|
|
arg.format(
|
|
input=self.opts.video,
|
|
output=f"{os.path.splitext(self.opts.video)[0]}.trim.{from_str}-{to_str}",
|
|
start_time=from_ft,
|
|
end_time=to_ft,
|
|
)
|
|
)
|
|
print(" ".join(["ffmpeg", "-hide_banner", *ffmpeg_args_print]))
|
|
if self.opts.ffmpeg_run:
|
|
subprocess.run(["ffmpeg", "-hide_banner", *ffmpeg_args])
|
|
|
|
def save_timestamps(self):
|
|
|
|
if self.opts.output is not None:
|
|
with open(self.opts.output, "wt") as fp:
|
|
for i, ts in enumerate(self.stamps):
|
|
fp.write("{},{},{}\n".format(self.format_time(ts), i + 1, ts))
|
|
print("Saved timestamps")
|
|
|
|
if self.opts.output_points is not None:
|
|
points = {}
|
|
for index in self.points.keys():
|
|
points[index] = {}
|
|
for key in sorted(self.points[index].keys()):
|
|
points[index][key] = self.points[index][key].copy()
|
|
points[index][key]["x0"], points[index][key]["y0"] = self.visual_to_original(
|
|
(self.points[index][key]["x0"], self.points[index][key]["y0"])
|
|
)
|
|
points[index][key]["x1"], points[index][key]["y1"] = self.visual_to_original(
|
|
(self.points[index][key]["x1"], self.points[index][key]["y1"])
|
|
)
|
|
if len(points[index]) == 0:
|
|
del points[index]
|
|
|
|
with open(self.opts.output_points, "wt") as fp:
|
|
json.dump(points, fp, indent=2)
|
|
print("Saved points")
|
|
|
|
def shadow_text(self, frame, text, pos, size, thicc, color):
|
|
|
|
cv2.putText(
|
|
frame,
|
|
text,
|
|
pos,
|
|
self.font,
|
|
size,
|
|
(0, 0, 0),
|
|
2 * thicc,
|
|
cv2.LINE_AA,
|
|
)
|
|
cv2.putText(
|
|
frame,
|
|
text,
|
|
pos,
|
|
self.font,
|
|
size,
|
|
color,
|
|
thicc,
|
|
cv2.LINE_AA,
|
|
)
|
|
|
|
def toggle_stamp(self):
|
|
if self.nr in self.stamps:
|
|
self.stamps.remove(self.nr)
|
|
else:
|
|
self.stamps.append(self.nr)
|
|
self.stamps.sort()
|
|
|
|
def original_to_visual(self, t):
|
|
"""display (x,y) to video resolution (x,y)"""
|
|
return (
|
|
int(self.video_res[0] * t[0] / self.video_res_original[0]),
|
|
int(self.video_res[1] * t[1] / self.video_res_original[1]),
|
|
)
|
|
|
|
def visual_to_original(self, t):
|
|
"""video resolution (x,y) to display (x,y)"""
|
|
return (
|
|
int(self.video_res_original[0] * t[0] / self.video_res[0]),
|
|
int(self.video_res_original[1] * t[1] / self.video_res[1]),
|
|
)
|
|
|
|
def loop(self):
|
|
|
|
self.step = 1
|
|
self.bigstep = 30
|
|
self.hugestep = 300
|
|
self.auto_step = False
|
|
self.last_move = []
|
|
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
|
|
|
|
self.print_help()
|
|
cv2.namedWindow("tsmark", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
|
|
cv2.setMouseCallback("tsmark", self.mouse_click)
|
|
digits_ords = [ord(str(x)) for x in range(10)]
|
|
|
|
FPS_modifier = 1
|
|
FPS_modifiers = [0.25, 1, 4]
|
|
read_fails = 0
|
|
while self.video_reader.isOpened():
|
|
show_time = time.time()
|
|
if (not self.paused) or self.read_next:
|
|
ret, frame = self.video_reader.read()
|
|
self.frame_raw = frame
|
|
if ret == True:
|
|
read_fails = 0
|
|
draw_wait = 200 if self.paused or (self.paused and self.point_click == 0) else 1
|
|
|
|
if (not self.paused) or self.read_next:
|
|
self.read_next = False
|
|
frame_visu = cv2.resize(frame.copy(), self.video_res)
|
|
self.frame_visu = frame_visu
|
|
self.draw_crop(frame_visu)
|
|
self.draw_points(frame_visu)
|
|
nr_time = self.nr / self.fps
|
|
if self.show_info:
|
|
self.draw_time(frame_visu)
|
|
self.draw_bar(frame_visu)
|
|
self.draw_label(frame_visu)
|
|
self.draw_message(frame_visu)
|
|
|
|
if self.show_help:
|
|
self.draw_help(frame_visu)
|
|
|
|
if cv2.getWindowProperty("tsmark", cv2.WND_PROP_VISIBLE) < 1:
|
|
break
|
|
cv2.imshow("tsmark", frame_visu)
|
|
k = cv2.waitKey(draw_wait)
|
|
|
|
if k & 0xFF == ord("q") or k & 0xFF == 27:
|
|
break
|
|
elif k & 0xFF == 32: # space
|
|
self.paused = not self.paused
|
|
|
|
# Movement =================
|
|
elif k & 0xFF == 80: # home key
|
|
if self.point_click == 1:
|
|
self.scan_point("first")
|
|
else:
|
|
self.nr = -1
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == 87: # end key
|
|
if self.point_click == 1:
|
|
self.scan_point("last")
|
|
else:
|
|
self.nr = self.frames - 1
|
|
self.paused = True
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == 85 or k & 0xFF == ord("]"): # pg up
|
|
self.nr = int((nr_time + self.hugestep) * self.fps) - 1
|
|
self.read_next = True
|
|
elif k & 0xFF == 86 or k & 0xFF == ord("["): # pg down
|
|
self.nr = int((nr_time - self.hugestep) * self.fps) - 1
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == 82 or k & 0xFF == ord("i"): # up arrow
|
|
self.nr = int((nr_time + self.bigstep) * self.fps) - 1
|
|
self.read_next = True
|
|
elif k & 0xFF == 84 or k & 0xFF == ord("k"): # down arrow
|
|
self.nr = int((nr_time - self.bigstep) * self.fps) - 1
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow
|
|
self.last_move.append(("r", time.time()))
|
|
if self.auto_step:
|
|
self.calculate_step()
|
|
self.nr = int((nr_time + self.step) * self.fps) - 1
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow
|
|
self.last_move.append(("l", time.time()))
|
|
if self.auto_step:
|
|
self.calculate_step()
|
|
self.nr = int((nr_time - self.step) * self.fps) - 1
|
|
self.read_next = True
|
|
|
|
# Move by frame
|
|
elif k & 0xFF == ord("."):
|
|
self.paused = True
|
|
self.read_next = True
|
|
elif k & 0xFF == ord(","):
|
|
self.paused = True
|
|
self.nr -= 2
|
|
self.read_next = True
|
|
|
|
elif k & 0xFF == ord("z"): # move to previous ts
|
|
if self.point_click == 1:
|
|
self.scan_point("previous")
|
|
else:
|
|
for ts in reversed(sorted(self.stamps)):
|
|
if ts < self.nr - 1:
|
|
self.nr = ts - 1
|
|
self.read_next = True
|
|
break
|
|
elif k & 0xFF == ord("c"): # move to next ts
|
|
if self.point_click == 1:
|
|
self.scan_point("next")
|
|
else:
|
|
for ts in sorted(self.stamps):
|
|
if ts > self.nr:
|
|
self.nr = ts - 1
|
|
self.read_next = True
|
|
break
|
|
|
|
# Move by number
|
|
elif k & 0xFF in digits_ords:
|
|
self.nr = int(digits_ords.index(k & 0xFF) * self.frames / 10) - 1
|
|
self.read_next = True
|
|
|
|
# Toggling =================
|
|
|
|
elif k & 0xFF == ord("f"): # modify FPS
|
|
FPS_modifier = (FPS_modifier + 1) % len(FPS_modifiers)
|
|
self.add_message(f"Player speed {round(1/FPS_modifiers[FPS_modifier],2)}")
|
|
elif k & 0xFF == ord("a"): # toggle crop offset
|
|
self.crop_click = 0 if self.crop_click == 1 else 1
|
|
self.crop[2] = True
|
|
elif k & 0xFF == ord("s"): # toggle crop size
|
|
self.crop_click = 0 if self.crop_click == 2 else 2
|
|
self.crop[2] = True
|
|
elif k & 0xFF == ord("o"): # toggle point visibility (yes/occlusion/hidden)
|
|
if self.opts.output_points is not None:
|
|
self.toggle_point_visibility()
|
|
elif k & 0xFF == ord("p"): # toggle points
|
|
if self.opts.output_points is not None:
|
|
self.point_click = 1 - self.point_click
|
|
if self.point_click == 1:
|
|
self.shadow_text(
|
|
frame_visu,
|
|
"Enter point index",
|
|
(20, 70),
|
|
0.9,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
self.shadow_text(
|
|
frame_visu,
|
|
"Exists: " + "".join(sorted(self.points.keys())),
|
|
(20, 105),
|
|
0.8,
|
|
1,
|
|
(255, 255, 255),
|
|
)
|
|
cv2.imshow("tsmark", frame_visu)
|
|
|
|
k2 = cv2.waitKey(0)
|
|
if k2 & 0xFF == ord("q") or k2 & 0xFF == 27:
|
|
self.point_click = 0
|
|
else:
|
|
self.point_index = chr(k2)
|
|
elif k & 0xFF == ord("g"): # Go to
|
|
self.shadow_text(
|
|
frame_visu,
|
|
"Enter frame or time",
|
|
(20, 70),
|
|
0.9,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
cv2.imshow("tsmark", frame_visu)
|
|
entered_chars = ""
|
|
while True:
|
|
frame_query = frame_visu.copy()
|
|
self.shadow_text(
|
|
frame_query,
|
|
entered_chars,
|
|
(20, 100),
|
|
0.9,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
cv2.imshow("tsmark", frame_query)
|
|
del frame_query
|
|
k2 = cv2.waitKey(0)
|
|
if k2 & 0xFF == ord("q") or k2 & 0xFF == 27:
|
|
break
|
|
elif k2 & 0xFF == ord("g") or k2 & 0xFF == 13:
|
|
try:
|
|
self.nr = int(entered_chars) - 1
|
|
except ValueError:
|
|
try:
|
|
self.nr = self.parse_time(entered_chars)
|
|
except Exception:
|
|
self.add_message("Cannot parse time")
|
|
break
|
|
self.read_next = True
|
|
break
|
|
elif k2 & 0xFF == 8: # backspace
|
|
entered_chars = entered_chars[0:-1]
|
|
elif k2 & 0xFF in digits_ords:
|
|
entered_chars += str(digits_ords.index(k2 & 0xFF))
|
|
elif k2 & 0xFF == ord(":"):
|
|
entered_chars += ":"
|
|
elif k2 & 0xFF == ord("."):
|
|
entered_chars += "."
|
|
else:
|
|
pass
|
|
|
|
elif k & 0xFF == ord("m"): # launch plugin module
|
|
self.launch_plugin()
|
|
elif k & 0xFF == ord("t"): # tracking
|
|
self.track_point()
|
|
elif k & 0xFF == ord("e"): # point edit (width height)
|
|
self.modify_point_wh()
|
|
elif k & 0xFF == ord("u"): # toggle interpolation
|
|
self.toggle_interpolation()
|
|
if self.point_click == 0:
|
|
self.shadow_text(
|
|
frame_visu,
|
|
(
|
|
"Point interpolation turned on"
|
|
if self.points_interpolation_enabled
|
|
else "Point interpolation turned off"
|
|
),
|
|
(20, 70),
|
|
0.9,
|
|
2,
|
|
(255, 255, 255),
|
|
)
|
|
cv2.imshow("tsmark", frame_visu)
|
|
k2 = cv2.waitKey(1000)
|
|
elif k & 0xFF == ord("x"): # toggle ts
|
|
if self.point_click == 1:
|
|
self.toggle_point(self.nr)
|
|
else:
|
|
self.toggle_stamp()
|
|
elif k & 0xFF == ord("r"): # convert interpolated points
|
|
self.convert_interpolated_points()
|
|
elif k & 0xFF == ord("v"):
|
|
self.show_info = not self.show_info
|
|
elif k & 0xFF == ord("h"):
|
|
self.print_help()
|
|
self.show_help = not self.show_help
|
|
|
|
if (not self.paused) or self.read_next:
|
|
self.nr += 1
|
|
if self.nr < 0:
|
|
self.nr = 0
|
|
if self.nr >= self.frames:
|
|
self.nr = self.frames - 1
|
|
self.paused = True
|
|
if self.read_next:
|
|
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
|
|
|
|
time_to_wait = FPS_modifiers[FPS_modifier] * self.viewer_spf - time.time() + show_time
|
|
if time_to_wait > 0:
|
|
time.sleep(time_to_wait)
|
|
|
|
else:
|
|
self.nr = self.frames - 2 - read_fails
|
|
self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr)
|
|
read_fails += 1
|
|
if read_fails > self.frames:
|
|
self.nr = 0
|
|
self.open()
|
|
self.paused = True
|
|
self.read_next = True
|
|
|
|
if time.time() > self.autosave_timer + self.autosave_interval:
|
|
self.autosave_timer = time.time()
|
|
try:
|
|
self.save_timestamps()
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
self.points_interpolation_thread_exit = True
|
|
self.video_reader.release()
|
|
cv2.destroyAllWindows()
|
|
self.print_timestamps()
|
|
self.save_timestamps()
|
|
|
|
|
|
class TrackerGUI:
|
|
def __init__(self, marker):
|
|
|
|
self.marker = marker
|
|
self.points = {}
|
|
try:
|
|
cv2.TrackerKCF_create()
|
|
except AttributeError:
|
|
marker.add_message("Tracking failed: missing opencv contrib")
|
|
return
|
|
|
|
self.start()
|
|
|
|
def start(self):
|
|
|
|
old_nr = self.marker.nr
|
|
|
|
curr_point = self.marker.get_point()
|
|
if curr_point["x0"] is None:
|
|
self.marker.add_message("Not in point frame (green)")
|
|
return
|
|
|
|
max_frames = int(
|
|
min(self.marker.point_tracking_length * self.marker.fps, self.marker.frames - self.marker.nr - 1)
|
|
)
|
|
cv2.namedWindow("tsmark - tracker", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL)
|
|
tracker = cv2.TrackerKCF_create()
|
|
|
|
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr)
|
|
# TODO: track using original video resolution!
|
|
ok, frame = self.marker.video_reader.read()
|
|
frame = cv2.resize(frame.copy(), self.marker.video_res)
|
|
bbox = tuple([curr_point["x0"], curr_point["y0"], curr_point["w"], curr_point["h"]])
|
|
bbox_area = curr_point["w"] * curr_point["h"]
|
|
ok = tracker.init(frame, bbox)
|
|
visu_interval = 0.2
|
|
show_time = 0
|
|
show_message = ""
|
|
tracked = {}
|
|
tracked[0] = [*bbox, 1]
|
|
for i in range(max_frames):
|
|
# Read a new frame
|
|
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr + i)
|
|
|
|
ok, frame = self.marker.video_reader.read()
|
|
frame = cv2.resize(frame.copy(), self.marker.video_res)
|
|
if not ok:
|
|
break
|
|
|
|
ok, bbox = tracker.update(frame)
|
|
if ok:
|
|
# Tracking success
|
|
if self.marker.nr + i in self.marker.points[self.marker.point_index]:
|
|
point = self.marker.get_point(nr=self.marker.nr + i)
|
|
bbox = tuple([point["x0"], point["y0"], point["w"], point["h"]])
|
|
tracked[i] = [*bbox, 1]
|
|
show_message = f"Tracking... ({i+1}/{max_frames})"
|
|
else:
|
|
# Tracking failure
|
|
show_message = f"Tracking failure detected ({i+1}/{max_frames})"
|
|
bbox = None
|
|
|
|
if time.time() > show_time + visu_interval:
|
|
# Display result
|
|
if bbox is not None:
|
|
p1 = (int(bbox[0]), int(bbox[1]))
|
|
p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
|
|
cv2.rectangle(frame, p1, p2, (255, 0, 0), 2, 1)
|
|
self.marker.shadow_text(frame, show_message, (100, 80), 0.75, 2, (255, 255, 255))
|
|
cv2.imshow("tsmark - tracker", frame)
|
|
show_time = time.time()
|
|
k = cv2.waitKey(1)
|
|
# break tracking if ESC pressed, q, space or enter
|
|
if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13:
|
|
break
|
|
|
|
done = False
|
|
paused = False
|
|
seek = False
|
|
cut_after = max_frames
|
|
while True:
|
|
if done:
|
|
break
|
|
|
|
i = 0
|
|
while True:
|
|
show_time = time.time()
|
|
self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr + i)
|
|
if done:
|
|
break
|
|
|
|
ok, frame = self.marker.video_reader.read()
|
|
frame = cv2.resize(frame.copy(), self.marker.video_res)
|
|
self.marker.shadow_text(frame, f"Accept? ({i+1}/{max_frames})", (100, 80), 0.75, 2, (255, 255, 255))
|
|
if i in tracked:
|
|
bbox = tracked[i]
|
|
p1 = (int(bbox[0]), int(bbox[1]))
|
|
p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3]))
|
|
color = (0, 255, 0) if cut_after > i else (0, 192, 192)
|
|
thicc = 2 if cut_after > i else 1
|
|
cv2.rectangle(frame, p1, p2, color, thicc, 1)
|
|
cv2.imshow("tsmark - tracker", frame)
|
|
# speed up fps by 2
|
|
time_to_wait = 0.2 if paused else (self.marker.viewer_spf / 2 - time.time() + show_time)
|
|
k = cv2.waitKey(max(1, int(time_to_wait * 1000)))
|
|
if k & 0xFF == ord("q") or k & 0xFF == 13: # accept with q or enter
|
|
done = True
|
|
break
|
|
if k & 0xFF == 27: # decline with escape
|
|
done = True
|
|
cut_after = 0
|
|
break
|
|
elif k & 0xFF == 32: # space
|
|
paused = not paused
|
|
# Movement =================
|
|
elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow
|
|
i += int(self.marker.fps)
|
|
seek = True
|
|
elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow
|
|
i -= int(self.marker.fps)
|
|
# Move by frame
|
|
elif k & 0xFF == ord(".") or k & 0xFF == ord("c"):
|
|
paused = True
|
|
i += 1
|
|
elif k & 0xFF == ord(",") or k & 0xFF == ord("z"):
|
|
paused = True
|
|
i -= 1
|
|
elif k & 0xFF == ord("x"):
|
|
cut_after = i + 1
|
|
# TODO: ord("h") for help!
|
|
|
|
if i >= max_frames - 1:
|
|
i = max_frames - 2
|
|
paused = True
|
|
if i < 0:
|
|
i = 0
|
|
paused = True
|
|
|
|
if not paused:
|
|
i += 1
|
|
|
|
cv2.destroyWindow("tsmark - tracker")
|
|
|
|
self.points = {}
|
|
for i in sorted(list(tracked.keys())):
|
|
if i >= cut_after:
|
|
continue
|
|
track_area = tracked[i][2] * tracked[i][3]
|
|
visibility = POINT_VISIBILITY[0] if track_area > 0.9 * bbox_area else POINT_VISIBILITY[1]
|
|
self.points[old_nr + i] = {
|
|
"x0": tracked[i][0],
|
|
"y0": tracked[i][1],
|
|
"x1": tracked[i][0] + tracked[i][2],
|
|
"y1": tracked[i][1] + tracked[i][3],
|
|
"visible": visibility,
|
|
}
|
|
|
|
self.marker.nr = old_nr + cut_after - 1
|
|
self.marker.read_next = True
|