import importlib.util import json import os import shlex import subprocess import sys import threading import time import cv2 import numpy as np from scipy.interpolate import PchipInterpolator PLUGIN_FOLDER = os.path.expanduser("~/.config/tsmark/plugins") COLOR_PREPOST = (0, 128, 128) COLOR_KEY = (60, 205, 60) COLOR_KEY_OCCLUDED = (50, 128, 50) COLOR_INTERP = (192, 0, 192) COLOR_INTERP_OCCLUDED = (128, 0, 128) COLOR_HIDDEN = (60, 60, 60) COLOR_NONE = (255, 255, 255) POINT_VISIBILITY = ("yes", "occluded", "hidden") class Marker: def __init__(self, opts): self.opts = opts if not os.path.exists(self.opts.video): raise FileNotFoundError("Video file missing!") self.paused = False self.read_next = False self.show_info = True self.show_help = False self.auto_step = True self.font = cv2.FONT_HERSHEY_SIMPLEX self.frame_visu = [] self.frame_raw = [] self.max_res = tuple([int(x) for x in self.opts.max_res.split("x")]) self.min_res = (512, None) self.mouse_position = (0, 0) self.crop = [(None, None), (None, None), None] self.crop_click = 0 self.point_click = 0 self.point_tracking = 0 self.point_tracking_length = float(self.opts.max_track) self.points = {} self.points_interpolated = {} self.point_index = None self.message = None self.message_timer = time.time() self.autosave_interval = 60 self.autosave_timer = time.time() self.forced_fps = opts.fps try: self.open() self.calculate_res() self.parse_timestamps() self.load_plugin() if self.opts.start_time: try: self.nr = int(self.opts.start_time) except ValueError: self.nr = self.parse_time(self.opts.start_time) self.loop() except Exception as e: exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] print(exc_type, fname, exc_tb.tb_lineno) raise e def open(self): self.video_reader = cv2.VideoCapture(self.opts.video) self.frames = int(self.video_reader.get(cv2.CAP_PROP_FRAME_COUNT)) self.fps = self.video_reader.get(cv2.CAP_PROP_FPS) self.spf = 1 / self.fps self.viewer_fps = self.forced_fps if self.forced_fps else self.fps self.viewer_spf = 1 / self.viewer_fps self.video_length = self.frames * self.fps def calculate_res(self): self.video_res = [ int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)), ] self.video_res_original = [ int(self.video_reader.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.video_reader.get(cv2.CAP_PROP_FRAME_HEIGHT)), ] video_aspect = self.video_res[0] / self.video_res[1] if self.video_res[0] > self.max_res[0]: self.video_res[0] = int(self.max_res[0]) self.video_res[1] = int(self.video_res[0] / video_aspect) if self.video_res[1] > self.max_res[1]: self.video_res[1] = int(self.max_res[1]) self.video_res[0] = int(self.video_res[1] * video_aspect) if self.video_res[0] < self.min_res[0]: self.video_res[0] = int(self.min_res[0]) self.video_res[1] = int(self.video_res[0] / video_aspect) self.video_res = tuple(self.video_res) self.crop = [(0, 0), tuple(self.video_res), None] if self.opts.crop: w, h, x, y = [int(c) for c in self.opts.crop.split(":")] self.crop = [ self.original_to_visual((x, y)), self.original_to_visual((w, h)), True, ] self.bar_start = int(self.video_res[0] * 0.05) self.bar_end = int(self.video_res[0] * 0.95) self.bar_top = int(self.video_res[1] * 0.90) self.bar_bottom = int(self.video_res[1] * 0.95) def calculate_step(self): now = time.time() self.last_move = [x for x in self.last_move if x[1] > now - 3] if len(self.last_move) == 0: self.step = 1 self.last_move = [] return lefts = sum([1 for x in self.last_move if x[0] == "l"]) rights = sum([1 for x in self.last_move if x[0] == "r"]) if lefts > 0 and rights > 0: self.step = 1 self.last_move = [] return count = max(lefts, rights) if count < 5: self.step = 1 else: # x2 poly from 5:5 -> 15:180 self.step = 45 - 16.5 * count + 1.7 * count * count self.step = min(self.step, 0.1 * self.video_length) self.step = int(self.step) def draw_bar(self, frame): position = self.nr / self.frames bar_position = int(self.bar_start + position * (self.bar_end - self.bar_start)) cv2.rectangle( frame, (self.bar_start, self.bar_top), (self.bar_end, self.bar_bottom), (255, 255, 255), 2, ) for ts in self.stamps: ts_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start)) cv2.line( frame, (ts_pos, self.bar_top), (ts_pos, self.bar_bottom), (32, 32, 32), 3, ) cv2.line( frame, (ts_pos, self.bar_top), (ts_pos, self.bar_bottom), (84, 255, 63), 1, ) if self.point_click == 1 and self.point_index in self.points: bar_middle = int((self.bar_top + self.bar_bottom) / 2) for ts in self.points[self.point_index]: p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start)) cv2.circle(frame, (p_pos, bar_middle), 3, (32, 32, 32), -1) for ts in self.points[self.point_index]: p_pos = int(self.bar_start + ts / self.frames * (self.bar_end - self.bar_start)) color = self.get_point_color(self.points[self.point_index][ts]) cv2.circle(frame, (p_pos, bar_middle), 1, color, -1) cv2.line( frame, (bar_position, self.bar_top), (bar_position, self.bar_bottom), (63, 84, 255), 1, ) self.shadow_text( frame, "1", (self.bar_start - 7, self.bar_bottom + 20), 0.7, 2, (255, 255, 255), ) end_frame = self.format_time(self.frames - 1) (text_width, text_height) = cv2.getTextSize(end_frame, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0] self.shadow_text( frame, end_frame, (self.bar_end - text_width, self.bar_bottom + 20), 0.7, 2, (255, 255, 255), ) def draw_crop(self, frame): if self.crop[2] is None: return p2 = (self.crop[0][0] + self.crop[1][0], self.crop[0][1] + self.crop[1][1]) cv2.rectangle( frame, self.crop[0], p2, (0, 192, 192), 1, ) if self.crop_click == 1: x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1])) self.shadow_text( frame, f"{x},{y}", self.crop[0], 0.5, 1, (0, 192, 192), ) if self.crop_click == 2: x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1])) w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1])) self.shadow_text( frame, f"{w}x{h}", self.crop[0], 0.5, 1, (0, 192, 192), ) def draw_points(self, frame): if self.opts.output_points is None: return for index in self.points: if index == self.point_index and self.point_click == 1: continue current = self.get_interpolated_point(index=index) if current["type"] in ("pre", "post"): continue if current["visible"] == "hidden": continue color = self.get_point_color(current) cv2.circle(frame, (current["cx"], current["cy"]), 10, (0, 0, 0), 2) cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1) self.shadow_text( frame, index, (current["cx"], current["cy"]), 0.5, 1, color, ) if self.point_click == 1: # Draw crosshair cv2.line( frame, (self.mouse_position[0], 0), (self.mouse_position[0], self.video_res[1]), (128, 128, 128), 1 ) cv2.line( frame, (0, self.mouse_position[1]), (self.video_res[0], self.mouse_position[1]), (128, 128, 128), 1 ) # Show current track x, y = [20, 70] self.shadow_text( frame, "P:" + str(self.point_index), (x, y), 0.5, 1, (255, 255, 255), ) try: current = self.get_interpolated_point() if current["type"] is not None: color = self.get_point_color(current) cv2.rectangle( frame, (current["x0"], current["y0"]), (current["x1"], current["y1"]), color, 2, ) cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1) history = [] for p in range(max(1, int(self.nr - self.viewer_fps)), self.nr + 1): po = self.get_interpolated_point(p) history.append([po["cx"], po["cy"]]) history = np.array(history, np.int32).reshape((-1, 1, 2)) cv2.polylines(frame, [history], False, COLOR_INTERP, 1) except KeyError: print(current, self.nr) pass except IndexError: print(current, self.nr) pass try: current = self.get_point() if current["x0"] is not None: cv2.circle(frame, (current["cx"], current["cy"]), 13, COLOR_KEY, 2) except KeyError: pass except IndexError: print(self.points[self.point_index]) print(self.nr) pass def scan_point(self, direction): def set_nr(ts): self.nr = ts - 1 self.read_next = True try: if direction == "first": return set_nr(min(list(self.points[self.point_index].keys()))) if direction == "last": return set_nr(max(list(self.points[self.point_index].keys()))) if direction == "next": for ts in sorted(list(self.points[self.point_index].keys())): if ts > self.nr: return set_nr(ts) if direction == "previous": for ts in reversed(sorted(list(self.points[self.point_index].keys()))): if ts < self.nr - 1: return set_nr(ts) except Exception: pass def toggle_point(self, ts): try: if ts in self.points[self.point_index]: # Remove point del self.points[self.point_index][ts] else: # Introduce point from interpolated ip = self.get_interpolated_point() if ip["type"] is None: return self.points[self.point_index][self.nr] = { "x0": ip["x0"], "y0": ip["y0"], "x1": ip["x1"], "y1": ip["y1"], "visible": POINT_VISIBILITY[0], } self.interpolate_points() except Exception: pass def get_point(self, nr=None, index=None): """{x0,y0,x1,y1, cx, cy, w, h, visible}""" if nr is None: nr = self.nr if index is None: index = self.point_index if index in self.points: if nr in self.points[index]: value = self.points[index][nr].copy() value.update( { "cx": int((value["x0"] + value["x1"]) / 2), "cy": int((value["y0"] + value["y1"]) / 2), "w": int(abs(value["x0"] - value["x1"])), "h": int(abs(value["y0"] - value["y1"])), } ) return value return { "x0": None, "y0": None, "x1": None, "y1": None, "cx": None, "cy": None, "w": None, "h": None, "visible": None, } def get_interpolated_point(self, nr=None, index=None): """{x0,y0,x1,y1, cx, cy, visible, type, age}""" if nr is None: nr = self.nr if index is None: index = self.point_index if index in self.points_interpolated: if nr in self.points_interpolated[index]: value = self.points_interpolated[index][nr].copy() value.update( { "cx": int((value["x0"] + value["x1"]) / 2), "cy": int((value["y0"] + value["y1"]) / 2), } ) return value return { "x0": None, "y0": None, "x1": None, "y1": None, "cx": None, "cy": None, "visible": None, "type": None, "age": None, } def convert_interpolated_points(self): if self.point_click == 1 and self.point_index in self.points: for nr in range(self.frames): ip = self.get_interpolated_point(nr=nr) if ip["type"] == "interp" and ip["visible"] == POINT_VISIBILITY[0]: self.points[self.point_index][nr] = { "x0": ip["x0"], "y0": ip["y0"], "x1": ip["x1"], "y1": ip["y1"], "visible": POINT_VISIBILITY[0], } self.interpolate_points() def modify_point(self, position, x, y): """position: tl topleft, br bottomright, c center""" if not self.point_index in self.points: self.points[self.point_index] = {} if not self.nr in self.points[self.point_index]: if len(self.points[self.point_index]) > 0: keys = sorted(list(self.points[self.point_index].keys())) if self.nr > keys[-1]: # last point if at end of track last_p = self.points[self.point_index][keys[-1]] elif self.nr < keys[0]: # first point if before track last_p = self.points[self.point_index][keys[0]] else: # previous point if in the middle of track prev_key = keys[0] for key in keys: if key > self.nr: last_p = self.points[self.point_index][prev_key] break prev_key = key w = abs(last_p["x1"] - last_p["x0"]) h = abs(last_p["y1"] - last_p["y0"]) else: w = 50 h = 50 if position == "tl": self.points[self.point_index][self.nr] = { "x0": x, "y0": y, "x1": min(self.video_res[0] - 1, x + w), "y1": min(self.video_res[1] - 1, y + h), "visible": POINT_VISIBILITY[0], } if position == "br": self.points[self.point_index][self.nr] = { "x0": max(0, x - w), "y0": max(0, y - h), "x1": x, "y1": y, "visible": POINT_VISIBILITY[0], } if position == "c": self.points[self.point_index][self.nr] = { "x0": max(0, int(x - w / 2)), "y0": max(0, int(y - h / 2)), "x1": min(self.video_res[0] - 1, int(x + w / 2)), "y1": min(self.video_res[1] - 1, int(y + h / 2)), "visible": POINT_VISIBILITY[0], } else: # not a new point self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[0] if position == "c": current = self.points[self.point_index][self.nr] w = abs(current["x1"] - current["x0"]) h = abs(current["y1"] - current["y0"]) self.points[self.point_index][self.nr] = { "x0": max(0, int(x - w / 2)), "y0": max(0, int(y - h / 2)), "x1": min(self.video_res[0] - 1, int(x + w / 2)), "y1": min(self.video_res[1] - 1, int(y + h / 2)), "visible": POINT_VISIBILITY[0], } elif position == "tl": self.points[self.point_index][self.nr]["x0"] = x self.points[self.point_index][self.nr]["y0"] = y elif position == "br": self.points[self.point_index][self.nr]["x1"] = x self.points[self.point_index][self.nr]["y1"] = y if self.points[self.point_index][self.nr]["x0"] > self.points[self.point_index][self.nr]["x1"]: self.points[self.point_index][self.nr]["x1"], self.points[self.point_index][self.nr]["x0"] = ( self.points[self.point_index][self.nr]["x0"], self.points[self.point_index][self.nr]["x1"], ) if self.points[self.point_index][self.nr]["y0"] > self.points[self.point_index][self.nr]["y1"]: self.points[self.point_index][self.nr]["y1"], self.points[self.point_index][self.nr]["y0"] = ( self.points[self.point_index][self.nr]["y0"], self.points[self.point_index][self.nr]["y1"], ) self.interpolate_points() def modify_point_wh(self): if self.point_click == 0: self.add_message("Not in point clicking mode") return if self.opts.output_points is None: return curr_point = self.get_point() if curr_point["x0"] is None: self.add_message("Not in point frame (green)") return new_wh = abs(self.mouse_position[0] - curr_point["cx"]) new_hh = abs(self.mouse_position[1] - curr_point["cy"]) self.points[self.point_index][self.nr]["x0"] = int(curr_point["cx"] - new_wh) self.points[self.point_index][self.nr]["y0"] = int(curr_point["cy"] - new_hh) self.points[self.point_index][self.nr]["x1"] = int(curr_point["cx"] + new_wh) self.points[self.point_index][self.nr]["y1"] = int(curr_point["cy"] + new_hh) self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[0] self.interpolate_points() def toggle_point_visibility(self): if self.point_click == 0: self.add_message("Not in point clicking mode") return if self.opts.output_points is None: return curr_point = self.get_point() if curr_point["x0"] is None: self.add_message("Not in point frame (green)") return try: new_index = (1 + POINT_VISIBILITY.index(self.points[self.point_index][self.nr]["visible"])) % len( POINT_VISIBILITY ) except (ValueError, KeyError): new_index = 0 self.points[self.point_index][self.nr]["visible"] = POINT_VISIBILITY[new_index] self.interpolate_points() def track_point(self): if self.point_click == 0: self.add_message("Not in point clicking mode") return if self.opts.output_points is None: return tracker_gui = TrackerGUI(self) if len(tracker_gui.points) > 0: for nr in tracker_gui.points: self.points[self.point_index][nr] = tracker_gui.points[nr] self.interpolate_points() self.nr = max(tracker_gui.points) - 1 self.read_next = True def load_plugin(self): self.plugin = None if self.opts.plugin: if not os.path.exists(os.path.join(PLUGIN_FOLDER, "hello.py")): os.makedirs(PLUGIN_FOLDER, exist_ok=True) with open(os.path.join(PLUGIN_FOLDER, "hello.py"), "wt") as fp: fp.write( """import cv2 import numpy as np class World: def __init__(self, tsmark): self.tsmark = tsmark self.window_name = "tsmark - plugin" print("plugin loaded") def __call__(self): print("plugin called") self.tsmark.paused = True cv2.namedWindow(self.window_name, flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL) frame = cv2.resize(np.zeros((16, 16, 3), dtype=np.uint8), self.tsmark.video_res) self.tsmark.shadow_text(frame, "Hello World! press q to exit.", (100, 80), 0.75, 2, (255, 255, 255)) cv2.imshow(self.window_name, frame) while True: k = cv2.waitKey(10) # break if ESC pressed, q, space or enter if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13: break cv2.destroyWindow(self.window_name) return """ ) plugin_file, plugin_class = self.opts.plugin.split(":", 1) plugin_path = os.path.join(PLUGIN_FOLDER, plugin_file + ".py") module_spec = importlib.util.spec_from_file_location("Plugin", plugin_path) loaded_plugin = importlib.util.module_from_spec(module_spec) module_spec.loader.exec_module(loaded_plugin) plugin_class = getattr(loaded_plugin, plugin_class) self.plugin = plugin_class(self) def launch_plugin(self): if self.plugin: self.plugin() def interpolate_points(self, point_index=None): """types: key: user clicked / accepted frame interp: interpolated frame pre: before any keyframes post: after any keyframes """ process = threading.Thread(target=self.interpolate_points_in_thread, args=(point_index,)) process.start() def interpolate_points_in_thread(self, point_index=None): if point_index is None: point_index = self.point_index def i_point(x0=None, y0=None, x1=None, y1=None, t=None, visible=None, age=None): return {"x0": x0, "y0": y0, "x1": x1, "y1": y1, "type": t, "visible": visible, "age": age} def point2array(p): return [p["x0"], p["y0"], p["x1"], p["y1"]] if not point_index in self.points: return if not point_index in self.points_interpolated: self.points_interpolated[point_index] = {key: {} for key in range(self.frames)} new_points = {k: v for k, v in self.points_interpolated[point_index].items()} if len(self.points[point_index]) == 1: # only one point added key = list(self.points[point_index].keys())[0] vals = self.points[point_index][key] for key in range(self.frames): new_points[key] = i_point() new_points[key].update(vals) new_points[key]["type"] = "pre" if key < self.nr else "post" new_points[self.nr]["type"] = "key" self.points_interpolated[point_index] = new_points else: # more points point_keys = list(sorted(list(self.points[point_index].keys()))) point_values = [point2array(self.points[point_index][k]) for k in point_keys] xyxy = np.array(point_values).T spline = PchipInterpolator(point_keys, xyxy, axis=1) start_key = min(point_keys) end_key = max(point_keys) + 1 t2 = np.arange(start_key, end_key) # Pre points for key in range(0, start_key): new_points[key]["type"] = "pre" new_points[key].update(self.points[point_index][start_key]) # interpolated points visible = self.points[point_index][start_key]["visible"] for row in np.vstack((t2, spline(t2))).T: if row[0] in point_keys: visible = self.points[point_index][row[0]]["visible"] new_points[row[0]] = { "type": "interp", "x0": int(row[1]), "y0": int(row[2]), "x1": int(row[3]), "y1": int(row[4]), "visible": visible, } # post points for key in range(end_key, self.frames + 1): new_points[key] = { "type": "post", "x0": int(row[1]), "y0": int(row[2]), "x1": int(row[3]), "y1": int(row[4]), "visible": visible, } # clicked points (not necessary, could determine at draw time!) for key in point_keys: new_points[key]["type"] = "key" age = 0 for key in new_points: if new_points[key]["type"] == "key": age = 0 if new_points[key]["type"] == "interp": age += 1 new_points[key]["age"] = age self.points_interpolated[point_index] = new_points def draw_help(self, frame): bottom = 80 left = 100 for row in self.get_help().split("\n"): self.shadow_text(frame, row, (left, bottom), 0.6, 1, (255, 255, 255)) bottom += 18 def draw_label(self, frame): if not self.nr in self.stamps: return text = "{} #{}".format(self.nr, self.stamps.index(self.nr) + 1) bottom = 60 left = 10 self.shadow_text(frame, text, (left, bottom), 1, 2, (63, 84, 255)) def draw_time(self, frame): left = 10 bottom = 30 formatted = "{} {}".format( self.format_time(self.nr), f"|| ({self.nr})" if self.paused else "", ) self.shadow_text(frame, formatted, (left, bottom), 1.1, 2, (255, 255, 255)) def draw_message(self, frame): if self.message is None: return if time.time() - 5 > self.message_timer: self.message = None return left = 10 bottom = 90 self.shadow_text(frame, self.message, (left, bottom), 0.9, 2, (255, 255, 255)) def add_message(self, new): self.message = new self.message_timer = time.time() def format_time(self, nframe): seconds = int(nframe / self.fps) frame = nframe % self.fps parts = int(100 * (frame / self.fps)) return time.strftime("%H:%M:%S", time.gmtime(seconds)) + ".%02d" % (parts) def get_point_color(self, point): t = point.get("type", "key") v = point.get("visible", "yes") if v == "hidden": return COLOR_HIDDEN if t == "key": if v == "yes": return COLOR_KEY if v == "occluded": return COLOR_KEY_OCCLUDED if t == "interp": if v == "yes": return COLOR_INTERP if v == "occluded": return COLOR_INTERP_OCCLUDED if t in ("post", "pre"): return COLOR_PREPOST return COLOR_NONE def get_help(self): return """Keyboard help: Arrows, PgUp, PgDn, Home, End or click mouse in position bar j l i k [ ] jump in video position 0-9 move to 0%,10%,20% .. position , and . move one frame at a time z and c move to previous or next mark x or double click in the video mark frame space or click video pause a and s modify crop offset or size f toggle 0.25x 1x or 4x FPS v toggle HUD h toggle help q or esc quit Bounding box editor: p toggle bounding box drawing. enter any key as index. o toggle object is visible/occluded/hidden x toggle (delete) key frame r convert interpolated points to points (no undo!) mouse left: set top-left corner of box mouse middle: set center of box mouse right: set lower right corner of box e set width/height of box symmetric around center z c Home End move between key-frames t start optical flow tracker m start plugin (if defined) Color codes: green |keypoint purple |interpolated darker tone |occluded key/interpolated yellow |post / pre points gray |point is hidden """ def mouse_click(self, event, x, y, flags, param): in_bar = all( ( x < self.bar_end, x > self.bar_start, y < self.bar_bottom, y > self.bar_top, ) ) self.mouse_position = (x, y) if self.crop_click == 1: self.crop[0] = (x, y) if event == cv2.EVENT_LBUTTONDOWN: self.crop_click = 0 return if self.crop_click == 2: self.crop[1] = (x - self.crop[0][0], y - self.crop[0][1]) if event == cv2.EVENT_LBUTTONDOWN: self.crop_click = 0 return if self.point_click == 1: if event == cv2.EVENT_LBUTTONDOWN: self.modify_point("tl", int(x), int(y)) if event == cv2.EVENT_RBUTTONDOWN: self.modify_point("br", int(x), int(y)) if event == cv2.EVENT_MBUTTONDOWN: self.modify_point("c", int(x), int(y)) return if event == cv2.EVENT_LBUTTONDOWN: if in_bar: click_relative = (x - self.bar_start) / (self.bar_end - self.bar_start) self.nr = int(click_relative * self.frames) self.read_next = True else: self.paused = not self.paused if event == cv2.EVENT_LBUTTONDBLCLK: if not in_bar: self.toggle_stamp() # doubleclick (toggle?) # ~ print("double", x, y) def parse_time(self, timestr): """return frames""" colon_count = len(timestr.split(":")) - 1 if colon_count == 0: secs = float(timestr) return int(secs * self.fps) if colon_count == 1: mins, secstr = timestr.split(":", 1) sec = float(secstr) return int(self.fps * (int(mins) * 60 + sec)) if colon_count == 2: hours, mins, secstr = timestr.split(":", 2) sec = float(secstr) return int(self.fps * (int(hours, 10) * 3600 + int(mins, 10) * 60 + sec)) raise ValueError("Cannot parse time definition {}".format(timestr)) raise TypeError("Cannot parse time definition {}".format(timestr)) def parse_timestamps(self): self.stamps = [] if self.opts.timestamps: if os.path.exists(self.opts.timestamps): with open(self.opts.timestamps, "rt") as fp: for row in fp.readlines(): # if row has 3 cols, pick the frame number directly splitted = row.split(",") if len(splitted) == 3: self.stamps.append(int(splitted[2], 10)) if len(splitted) < 3: self.opts.timestamps.append(splitted[0]) else: self.opts.timestamps = self.opts.timestamps.split(",") if len(self.stamps) > 0: self.stamps.sort() else: self.stamps = sorted([self.parse_time(ts.strip()) for ts in self.opts.timestamps if ts.strip() != ""]) self.stamps = [x for x in self.stamps if 0 <= x < self.frames] self.nr = self.stamps[0] else: self.stamps = [] self.nr = 0 # Read bounding boxes from JSON if self.opts.input_points: if os.path.exists(self.opts.input_points): with open(self.opts.input_points, "rt") as fp: self.points = json.load(fp) keys = list(self.points.keys()) for index in keys: # Remove empty dicts if len(self.points[index]) == 0: del self.points[index] continue self.point_index = index self.points[index] = {int(k): v for k, v in self.points[index].items()} for key in self.points[index]: self.points[index][key]["x0"], self.points[index][key]["y0"] = self.original_to_visual( (self.points[index][key]["x0"], self.points[index][key]["y0"]) ) self.points[index][key]["x1"], self.points[index][key]["y1"] = self.original_to_visual( (self.points[index][key]["x1"], self.points[index][key]["y1"]) ) if not self.points[index][key].get("visible", "NA") in POINT_VISIBILITY: self.points[index][key]["visible"] = POINT_VISIBILITY[0] self.interpolate_points() print(f"Loaded points with index: {index}") self.point_index = None def print_help(self): print(self.get_help()) def print_timestamps(self): if self.crop[2] is None: cropstr = [] else: self.opts.ffmpeg_copy = False x, y = self.visual_to_original((self.crop[0][0], self.crop[0][1])) w, h = self.visual_to_original((self.crop[1][0], self.crop[1][1])) if w < 0: x = x + w w = -w if h < 0: y = y + h h = -h cropstr = ["-vf", f"crop={w}:{h}:{x}:{y}"] self.stamps.sort() print("# Timestamps:") for i, ts in enumerate(self.stamps): print("# {}: {} / {}".format(i + 1, self.format_time(ts), ts)) if len(self.stamps) == 0: self.stamps.append(0) self.stamps.append(self.frames) padlen = len(str(self.frames)) src_name_print = self.opts.video.replace('"', '\\"') tgt_name_print = os.path.splitext(self.opts.video)[0].replace('"', '\\"') for i in range(1, len(self.stamps), 2): from_ts = self.stamps[i - 1] to_ts = self.stamps[i] from_ft = self.format_time(from_ts) to_ft = self.format_time(to_ts) from_str = str(from_ts).zfill(padlen) to_str = str(to_ts).zfill(padlen) ffmpeg_args_print = [] ffmpeg_args = [] for arg in shlex.split(self.opts.ffmpeg_args): if arg == "{crop}": ffmpeg_args_print.extend(cropstr) ffmpeg_args.extend(cropstr) else: ffmpeg_args_print.append( arg.format( input=shlex.quote(src_name_print), output=shlex.quote(f"{tgt_name_print}.trim.{from_str}-{to_str}"), start_time=from_ft, end_time=to_ft, ) ) ffmpeg_args.append( arg.format( input=self.opts.video, output=f"{os.path.splitext(self.opts.video)[0]}.trim.{from_str}-{to_str}", start_time=from_ft, end_time=to_ft, ) ) print(" ".join(["ffmpeg", "-hide_banner", *ffmpeg_args_print])) if self.opts.ffmpeg_run: subprocess.run(["ffmpeg", "-hide_banner", *ffmpeg_args]) def save_timestamps(self): if self.opts.output is not None: with open(self.opts.output, "wt") as fp: for i, ts in enumerate(self.stamps): fp.write("{},{},{}\n".format(self.format_time(ts), i + 1, ts)) print("Saved timestamps") if self.opts.output_points is not None: points = {} for index in self.points.keys(): points[index] = {} for key in sorted(self.points[index].keys()): points[index][key] = self.points[index][key].copy() points[index][key]["x0"], points[index][key]["y0"] = self.visual_to_original( (self.points[index][key]["x0"], self.points[index][key]["y0"]) ) points[index][key]["x1"], points[index][key]["y1"] = self.visual_to_original( (self.points[index][key]["x1"], self.points[index][key]["y1"]) ) if len(points[index]) == 0: del points[index] with open(self.opts.output_points, "wt") as fp: json.dump(points, fp, indent=2) print("Saved points") def shadow_text(self, frame, text, pos, size, thicc, color): cv2.putText( frame, text, pos, self.font, size, (0, 0, 0), 2 * thicc, cv2.LINE_AA, ) cv2.putText( frame, text, pos, self.font, size, color, thicc, cv2.LINE_AA, ) def toggle_stamp(self): if self.nr in self.stamps: self.stamps.remove(self.nr) else: self.stamps.append(self.nr) self.stamps.sort() def original_to_visual(self, t): """display (x,y) to video resolution (x,y)""" return ( int(self.video_res[0] * t[0] / self.video_res_original[0]), int(self.video_res[1] * t[1] / self.video_res_original[1]), ) def visual_to_original(self, t): """video resolution (x,y) to display (x,y)""" return ( int(self.video_res_original[0] * t[0] / self.video_res[0]), int(self.video_res_original[1] * t[1] / self.video_res[1]), ) def loop(self): self.step = 1 self.bigstep = 30 self.hugestep = 300 self.auto_step = False self.last_move = [] self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr) self.print_help() cv2.namedWindow("tsmark", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL) cv2.setMouseCallback("tsmark", self.mouse_click) digits_ords = [ord(str(x)) for x in range(10)] FPS_modifier = 1 FPS_modifiers = [0.25, 1, 4] read_fails = 0 while self.video_reader.isOpened(): show_time = time.time() if (not self.paused) or self.read_next: ret, frame = self.video_reader.read() self.frame_raw = frame if ret == True: read_fails = 0 draw_wait = 200 if self.paused or (self.paused and self.point_click == 0) else 1 if (not self.paused) or self.read_next: self.read_next = False frame_visu = cv2.resize(frame.copy(), self.video_res) self.frame_visu = frame_visu self.draw_crop(frame_visu) self.draw_points(frame_visu) nr_time = self.nr / self.fps if self.show_info: self.draw_time(frame_visu) self.draw_bar(frame_visu) self.draw_label(frame_visu) self.draw_message(frame_visu) if self.show_help: self.draw_help(frame_visu) if cv2.getWindowProperty("tsmark", cv2.WND_PROP_VISIBLE) < 1: break cv2.imshow("tsmark", frame_visu) k = cv2.waitKey(draw_wait) if k & 0xFF == ord("q") or k & 0xFF == 27: break elif k & 0xFF == 32: # space self.paused = not self.paused # Movement ================= elif k & 0xFF == 80: # home key if self.point_click == 1: self.scan_point("first") else: self.nr = -1 self.read_next = True elif k & 0xFF == 87: # end key if self.point_click == 1: self.scan_point("last") else: self.nr = self.frames - 1 self.paused = True self.read_next = True elif k & 0xFF == 85 or k & 0xFF == ord("]"): # pg up self.nr = int((nr_time + self.hugestep) * self.fps) - 1 self.read_next = True elif k & 0xFF == 86 or k & 0xFF == ord("["): # pg down self.nr = int((nr_time - self.hugestep) * self.fps) - 1 self.read_next = True elif k & 0xFF == 82 or k & 0xFF == ord("i"): # up arrow self.nr = int((nr_time + self.bigstep) * self.fps) - 1 self.read_next = True elif k & 0xFF == 84 or k & 0xFF == ord("k"): # down arrow self.nr = int((nr_time - self.bigstep) * self.fps) - 1 self.read_next = True elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow self.last_move.append(("r", time.time())) if self.auto_step: self.calculate_step() self.nr = int((nr_time + self.step) * self.fps) - 1 self.read_next = True elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow self.last_move.append(("l", time.time())) if self.auto_step: self.calculate_step() self.nr = int((nr_time - self.step) * self.fps) - 1 self.read_next = True # Move by frame elif k & 0xFF == ord("."): self.paused = True self.read_next = True elif k & 0xFF == ord(","): self.paused = True self.nr -= 2 self.read_next = True elif k & 0xFF == ord("z"): # move to previous ts if self.point_click == 1: self.scan_point("previous") else: for ts in reversed(sorted(self.stamps)): if ts < self.nr - 1: self.nr = ts - 1 self.read_next = True break elif k & 0xFF == ord("c"): # move to next ts if self.point_click == 1: self.scan_point("next") else: for ts in sorted(self.stamps): if ts > self.nr: self.nr = ts - 1 self.read_next = True break # Move by number elif k & 0xFF in digits_ords: self.nr = int(digits_ords.index(k & 0xFF) * self.frames / 10) - 1 self.read_next = True # Toggling ================= elif k & 0xFF == ord("f"): # modify FPS FPS_modifier = (FPS_modifier + 1) % len(FPS_modifiers) self.add_message(f"Player speed {round(1/FPS_modifiers[FPS_modifier],2)}") elif k & 0xFF == ord("a"): # toggle crop offset self.crop_click = 0 if self.crop_click == 1 else 1 self.crop[2] = True elif k & 0xFF == ord("s"): # toggle crop size self.crop_click = 0 if self.crop_click == 2 else 2 self.crop[2] = True elif k & 0xFF == ord("o"): # toggle point visibility (yes/occlusion/hidden) if self.opts.output_points is not None: self.toggle_point_visibility() elif k & 0xFF == ord("p"): # toggle points if self.opts.output_points is not None: self.point_click = 1 - self.point_click if self.point_click == 1: self.shadow_text( frame_visu, "Enter point index", (20, 70), 0.9, 2, (255, 255, 255), ) self.shadow_text( frame_visu, "Exists: " + "".join(sorted(self.points.keys())), (20, 105), 0.8, 1, (255, 255, 255), ) cv2.imshow("tsmark", frame_visu) k2 = cv2.waitKey(0) if k2 & 0xFF == ord("q") or k2 & 0xFF == 27: self.point_click = 0 else: self.point_index = chr(k2) elif k & 0xFF == ord("g"): # Go to self.shadow_text( frame_visu, "Enter frame or time", (20, 70), 0.9, 2, (255, 255, 255), ) cv2.imshow("tsmark", frame_visu) entered_chars = "" while True: frame_query = frame_visu.copy() self.shadow_text( frame_query, entered_chars, (20, 100), 0.9, 2, (255, 255, 255), ) cv2.imshow("tsmark", frame_query) del frame_query k2 = cv2.waitKey(0) if k2 & 0xFF == ord("q") or k2 & 0xFF == 27: break elif k2 & 0xFF == ord("g") or k2 & 0xFF == 13: try: self.nr = int(entered_chars) - 1 except ValueError: try: self.nr = self.parse_time(entered_chars) except Exception: self.add_message("Cannot parse time") break self.read_next = True break elif k2 & 0xFF == 8: # backspace entered_chars = entered_chars[0:-1] elif k2 & 0xFF in digits_ords: entered_chars += str(digits_ords.index(k2 & 0xFF)) elif k2 & 0xFF == ord(":"): entered_chars += ":" elif k2 & 0xFF == ord("."): entered_chars += "." else: pass elif k & 0xFF == ord("m"): # launch plugin module self.launch_plugin() elif k & 0xFF == ord("t"): # tracking self.track_point() elif k & 0xFF == ord("e"): # point edit (width height) self.modify_point_wh() elif k & 0xFF == ord("x"): # toggle ts if self.point_click == 1: self.toggle_point(self.nr) else: self.toggle_stamp() elif k & 0xFF == ord("r"): # convert interpolated points self.convert_interpolated_points() elif k & 0xFF == ord("v"): self.show_info = not self.show_info elif k & 0xFF == ord("h"): self.print_help() self.show_help = not self.show_help if (not self.paused) or self.read_next: self.nr += 1 if self.nr < 0: self.nr = 0 if self.nr >= self.frames: self.nr = self.frames - 1 self.paused = True if self.read_next: self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr) time_to_wait = FPS_modifiers[FPS_modifier] * self.viewer_spf - time.time() + show_time if time_to_wait > 0: time.sleep(time_to_wait) else: self.nr = self.frames - 2 - read_fails self.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.nr) read_fails += 1 if read_fails > self.frames: self.nr = 0 self.open() self.paused = True self.read_next = True if time.time() > self.autosave_timer + self.autosave_interval: self.autosave_timer = time.time() try: self.save_timestamps() except Exception as e: print(e) self.video_reader.release() cv2.destroyAllWindows() self.print_timestamps() self.save_timestamps() class TrackerGUI: def __init__(self, marker): self.marker = marker self.points = {} try: cv2.TrackerKCF_create() except AttributeError: marker.add_message("Tracking failed: missing opencv contrib") return self.start() def start(self): old_nr = self.marker.nr curr_point = self.marker.get_point() if curr_point["x0"] is None: self.marker.add_message("Not in point frame (green)") return max_frames = int( min(self.marker.point_tracking_length * self.marker.fps, self.marker.frames - self.marker.nr - 1) ) cv2.namedWindow("tsmark - tracker", flags=cv2.WINDOW_AUTOSIZE | cv2.WINDOW_KEEPRATIO | cv2.WINDOW_GUI_NORMAL) tracker = cv2.TrackerKCF_create() self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr) # TODO: track using original video resolution! ok, frame = self.marker.video_reader.read() frame = cv2.resize(frame.copy(), self.marker.video_res) bbox = tuple([curr_point["x0"], curr_point["y0"], curr_point["w"], curr_point["h"]]) ok = tracker.init(frame, bbox) visu_interval = 0.2 show_time = 0 show_message = "" tracked = {} tracked[0] = [*bbox, 1] for i in range(max_frames): # Read a new frame ok, frame = self.marker.video_reader.read() frame = cv2.resize(frame.copy(), self.marker.video_res) if not ok: break ok, bbox = tracker.update(frame) if ok: # Tracking success if self.marker.nr + i + 1 in self.marker.points[self.marker.point_index]: point = self.marker.get_point(nr=self.marker.nr + i + 1) bbox = tuple([point["x0"], point["y0"], point["w"], point["h"]]) tracked[i + 1] = [*bbox, 1] show_message = f"Tracking... ({i}/{max_frames})" else: # Tracking failure show_message = f"Tracking failure detected ({i}/{max_frames})" bbox = None if time.time() > show_time + visu_interval: # Display result if bbox is not None: p1 = (int(bbox[0]), int(bbox[1])) p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3])) cv2.rectangle(frame, p1, p2, (255, 0, 0), 2, 1) self.marker.shadow_text(frame, show_message, (100, 80), 0.75, 2, (255, 255, 255)) cv2.imshow("tsmark - tracker", frame) show_time = time.time() k = cv2.waitKey(1) # break tracking if ESC pressed, q, space or enter if k & 0xFF == ord("q") or k & 0xFF == 32 or k & 0xFF == 27 or k & 0xFF == 13: break done = False paused = False seek = False cut_after = max_frames while True: if done: break self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr) i = -1 while True: show_time = time.time() if done: break if paused: frame = frame_copy.copy() if (not paused) or seek: ok, frame = self.marker.video_reader.read() frame = cv2.resize(frame.copy(), self.marker.video_res) frame_copy = frame.copy() i += 1 seek = False self.marker.shadow_text(frame, f"Accept? ({i+1}/{max_frames})", (100, 80), 0.75, 2, (255, 255, 255)) if i in tracked: bbox = tracked[i] p1 = (int(bbox[0]), int(bbox[1])) p2 = (int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3])) color = (0, 255, 0) if cut_after > i else (0, 192, 192) thicc = 2 if cut_after > i else 1 cv2.rectangle(frame, p1, p2, color, thicc, 1) cv2.imshow("tsmark - tracker", frame) # speed up fps by 2 time_to_wait = self.marker.viewer_spf / 2 - time.time() + show_time k = cv2.waitKey(max(1, int(time_to_wait * 1000))) if k & 0xFF == ord("q") or k & 0xFF == 13: # accept with q or enter done = True break if k & 0xFF == 27: # decline with escape done = True cut_after = 0 break elif k & 0xFF == 32: # space paused = not paused # Movement ================= elif k & 0xFF == 83 or k & 0xFF == ord("l"): # right arrow i += int(self.marker.fps) - 1 seek = True elif k & 0xFF == 81 or k & 0xFF == ord("j"): # left arrow i -= int(self.marker.fps) + 1 seek = True # Move by frame elif k & 0xFF == ord(".") or k & 0xFF == ord("c"): paused = True seek = True elif k & 0xFF == ord(",") or k & 0xFF == ord("z"): paused = True i -= 2 seek = True elif k & 0xFF == ord("x"): cut_after = i # TODO: ord("h") for help! if i >= max_frames - 1: i = max_frames - 2 paused = True seek = True if i < 0: i = -1 paused = True seek = True if seek: self.marker.video_reader.set(cv2.CAP_PROP_POS_FRAMES, self.marker.nr + i + 1) cv2.destroyWindow("tsmark - tracker") self.marker.nr = old_nr - 1 self.marker.read_next = True self.points = {} for i in sorted(list(tracked.keys())): if i >= cut_after: continue self.points[self.marker.nr + i + 1] = { "x0": tracked[i][0], "y0": tracked[i][1], "x1": tracked[i][0] + tracked[i][2], "y1": tracked[i][1] + tracked[i][3], "visible": POINT_VISIBILITY[0], }