From 74db994d1643b06a8fab0fc2cb536500ac7cffd5 Mon Sep 17 00:00:00 2001 From: q Date: Fri, 22 Aug 2025 18:01:10 +0300 Subject: [PATCH] fixing interpolation error --- tsmark/video_annotator.py | 220 +++++++++++++++++++++----------------- 1 file changed, 121 insertions(+), 99 deletions(-) diff --git a/tsmark/video_annotator.py b/tsmark/video_annotator.py index 4aa88ed..72ddb61 100755 --- a/tsmark/video_annotator.py +++ b/tsmark/video_annotator.py @@ -315,16 +315,20 @@ class Marker: cv2.circle(frame, (current["cx"], current["cy"]), 10, color, 1) - if self.points_interpolation_enabled: - history = [] - for p in range(max(1, int(self.nr - self.viewer_fps)), self.nr + 1): - po = self.get_interpolated_point(p) - history.append([po["cx"], po["cy"]]) - history = np.array(history, np.int32).reshape((-1, 1, 2)) - cv2.polylines(frame, [history], False, COLOR_INTERP, 1) + except (KeyError, IndexError, TypeError): + # print(self.get_interpolated_point(), self.nr) + pass + try: + if self.points_interpolation_enabled: + history = [] + for p in range(max(1, int(self.nr - self.viewer_fps)), self.nr + 1): + po = self.get_interpolated_point(p) + history.append([po["cx"], po["cy"]]) + history = np.array(history, np.int32).reshape((-1, 1, 2)) + cv2.polylines(frame, [history], False, COLOR_INTERP, 1) except (KeyError, IndexError, TypeError): - print(self.get_interpolated_point(), self.nr) + # print(self.get_interpolated_point(), self.nr) pass try: current = self.get_point() @@ -333,8 +337,8 @@ class Marker: except KeyError: pass except IndexError: - print(self.points[self.point_index]) - print(self.nr) + # print(self.points[self.point_index]) + # print(self.nr) pass def scan_point(self, direction): @@ -493,9 +497,11 @@ class Marker: prev_key = key w = abs(last_p["x1"] - last_p["x0"]) h = abs(last_p["y1"] - last_p["y0"]) + visibility = last_p["visible"] else: w = 50 h = 50 + visibility = POINT_VISIBILITY[0] if position == "tl": self.points[self.point_index][self.nr] = { @@ -503,7 +509,7 @@ class Marker: "y0": y, "x1": min(self.video_res[0] - 1, x + w), "y1": min(self.video_res[1] - 1, y + h), - "visible": last_p["visible"], + "visible": visibility, } if position == "br": self.points[self.point_index][self.nr] = { @@ -511,7 +517,7 @@ class Marker: "y0": max(0, y - h), "x1": x, "y1": y, - "visible": last_p["visible"], + "visible": visibility, } if position == "c": self.points[self.point_index][self.nr] = { @@ -519,7 +525,7 @@ class Marker: "y0": max(0, int(y - h / 2)), "x1": min(self.video_res[0] - 1, int(x + w / 2)), "y1": min(self.video_res[1] - 1, int(y + h / 2)), - "visible": last_p["visible"], + "visible": visibility, } else: @@ -528,13 +534,14 @@ class Marker: current = self.points[self.point_index][self.nr] w = abs(current["x1"] - current["x0"]) h = abs(current["y1"] - current["y0"]) - self.points[self.point_index][self.nr] = { - "x0": max(0, int(x - w / 2)), - "y0": max(0, int(y - h / 2)), - "x1": min(self.video_res[0] - 1, int(x + w / 2)), - "y1": min(self.video_res[1] - 1, int(y + h / 2)), - "visible": current["visible"], - } + self.points[self.point_index][self.nr].update( + { + "x0": max(0, int(x - w / 2)), + "y0": max(0, int(y - h / 2)), + "x1": min(self.video_res[0] - 1, int(x + w / 2)), + "y1": min(self.video_res[1] - 1, int(y + h / 2)), + } + ) elif position == "tl": self.points[self.point_index][self.nr]["x0"] = x self.points[self.point_index][self.nr]["y0"] = y @@ -554,7 +561,6 @@ class Marker: self.points[self.point_index][self.nr]["y1"], ) - # self.interpolate_points() self.points_interpolation_required[self.point_index] = True def modify_point_wh(self): @@ -620,7 +626,6 @@ class Marker: if len(tracker_gui.points) > 0: for nr in tracker_gui.points: self.points[self.point_index][nr] = tracker_gui.points[nr] - # self.interpolate_points() self.points_interpolation_required[self.point_index] = True self.nr = max(tracker_gui.points) - 1 self.read_next = True @@ -677,92 +682,108 @@ class World: pre: before any keyframes post: after any keyframes """ + try: + if not point_index in self.points: + return + self.interpolate_thread_start() + + if point_index is None: + point_index = self.point_index + + self.points_interpolation_required[point_index] = False + + def i_point(x0=None, y0=None, x1=None, y1=None, t=None, visible=None, age=None): + return {"x0": x0, "y0": y0, "x1": x1, "y1": y1, "type": t, "visible": visible, "age": age} + + def point2array(p): + return [p["x0"], p["y0"], p["x1"], p["y1"]] + + if not point_index in self.points_interpolated: + self.points_interpolated[point_index] = {key: {} for key in range(self.frames)} + + new_points = {k: v for k, v in self.points_interpolated[point_index].items()} + + if len(self.points[point_index]) == 1: # only one point added + key = list(self.points[point_index].keys())[0] + vals = self.points[point_index][key] + for key in range(self.frames): + new_points[key] = i_point() + new_points[key].update(vals) + new_points[key]["type"] = "pre" if key < self.nr else "post" + + new_points[self.nr]["type"] = "key" + self.points_interpolated[point_index] = new_points + + else: # more points + point_keys = list(sorted(list(self.points[point_index].keys()))) + point_values = [point2array(self.points[point_index][k]) for k in point_keys] + xyxy = np.array(point_values).T + spline = PchipInterpolator(point_keys, xyxy, axis=1) + start_key = min(point_keys) + end_key = max(point_keys) + 1 + t2 = np.arange(start_key, end_key) + # Pre points + for key in range(0, start_key): + new_points[key]["type"] = "pre" + new_points[key].update(self.points[point_index][start_key]) + # interpolated points + visible = self.points[point_index][start_key]["visible"] + for row in np.vstack((t2, spline(t2))).T: + if row[0] in point_keys: + visible = self.points[point_index][row[0]]["visible"] + new_points[row[0]] = { + "type": "interp", + "x0": int(row[1]), + "y0": int(row[2]), + "x1": int(row[3]), + "y1": int(row[4]), + "visible": visible, + } + + # post points + for key in range(end_key, self.frames + 1): + new_points[key] = { + "type": "post", + "x0": int(row[1]), + "y0": int(row[2]), + "x1": int(row[3]), + "y1": int(row[4]), + "visible": visible, + } + # clicked points (not necessary, could determine at draw time!) + for key in point_keys: + new_points[key]["type"] = "key" + age = 0 + for key in new_points: + if new_points[key]["type"] == "key": + age = 0 + if new_points[key]["type"] == "interp": + age += 1 + new_points[key]["age"] = age + + self.points_interpolated[point_index] = new_points + except Exception as e: + print(f"Interpolation error: {e}") + + def interpolate_thread_start(self): if self.points_interpolation_thread is None: self.points_interpolation_thread = threading.Thread(target=self.interpolate_points_in_thread, args=()) self.points_interpolation_thread.start() + return if not self.points_interpolation_thread.is_alive(): self.points_interpolation_thread = threading.Thread(target=self.interpolate_points_in_thread, args=()) self.points_interpolation_thread.start() - - if point_index is None: - point_index = self.point_index - - def i_point(x0=None, y0=None, x1=None, y1=None, t=None, visible=None, age=None): - return {"x0": x0, "y0": y0, "x1": x1, "y1": y1, "type": t, "visible": visible, "age": age} - - def point2array(p): - return [p["x0"], p["y0"], p["x1"], p["y1"]] - - if not point_index in self.points: return - self.points_interpolation_required[point_index] = False - if not point_index in self.points_interpolated: - self.points_interpolated[point_index] = {key: {} for key in range(self.frames)} + def interpolate_thread_stop(self): - new_points = {k: v for k, v in self.points_interpolated[point_index].items()} - - if len(self.points[point_index]) == 1: # only one point added - key = list(self.points[point_index].keys())[0] - vals = self.points[point_index][key] - for key in range(self.frames): - new_points[key] = i_point() - new_points[key].update(vals) - new_points[key]["type"] = "pre" if key < self.nr else "post" - - new_points[self.nr]["type"] = "key" - self.points_interpolated[point_index] = new_points - - else: # more points - point_keys = list(sorted(list(self.points[point_index].keys()))) - point_values = [point2array(self.points[point_index][k]) for k in point_keys] - xyxy = np.array(point_values).T - spline = PchipInterpolator(point_keys, xyxy, axis=1) - start_key = min(point_keys) - end_key = max(point_keys) + 1 - t2 = np.arange(start_key, end_key) - # Pre points - for key in range(0, start_key): - new_points[key]["type"] = "pre" - new_points[key].update(self.points[point_index][start_key]) - # interpolated points - visible = self.points[point_index][start_key]["visible"] - for row in np.vstack((t2, spline(t2))).T: - if row[0] in point_keys: - visible = self.points[point_index][row[0]]["visible"] - new_points[row[0]] = { - "type": "interp", - "x0": int(row[1]), - "y0": int(row[2]), - "x1": int(row[3]), - "y1": int(row[4]), - "visible": visible, - } - - # post points - for key in range(end_key, self.frames + 1): - new_points[key] = { - "type": "post", - "x0": int(row[1]), - "y0": int(row[2]), - "x1": int(row[3]), - "y1": int(row[4]), - "visible": visible, - } - # clicked points (not necessary, could determine at draw time!) - for key in point_keys: - new_points[key]["type"] = "key" - age = 0 - for key in new_points: - if new_points[key]["type"] == "key": - age = 0 - if new_points[key]["type"] == "interp": - age += 1 - new_points[key]["age"] = age - - self.points_interpolated[point_index] = new_points + + self.points_interpolation_thread_exit = True + for point_index in self.points_interpolation_required: + if self.points_interpolation_required[point_index]: + self.interpolate_points(point_index) def interpolate_points_in_thread(self): @@ -1319,7 +1340,8 @@ class World: (255, 255, 255), ) cv2.imshow("tsmark", frame_visu) - + if self.points_interpolation_enabled: + self.interpolate_thread_start() k2 = cv2.waitKey(0) if k2 & 0xFF == ord("q") or k2 & 0xFF == 27: self.point_click = 0 @@ -1440,7 +1462,7 @@ class World: except Exception as e: print(e) - self.points_interpolation_thread_exit = True + self.interpolate_thread_stop() self.video_reader.release() cv2.destroyAllWindows() self.print_timestamps()