From 4e8a0e51cb93a75b840b712416c6b0e062307c6f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 10:55:39 +0100 Subject: [PATCH 01/22] Fix worker stop, timestamp handling, and imports Preserve explicit timestamp values and strengthen worker shutdown behavior. Key changes: - recording_manager: use timestamp if timestamp is not None else time.time() so falsy timestamps (e.g. 0) are preserved instead of being overridden. - services/dlc_processor: import DLCLive at module top and enable profiling; make _stop_worker return a boolean and have reset() check that return value to avoid resetting the DLCLive instance while the worker thread is still alive (with warnings/logging). Adjusted comments about thread termination. - gui/main_window: use a truthiness check for dlc_cam_id to avoid treating empty values as valid IDs. These updates prevent spurious timestamp overrides, avoid unsafe DLCLive resets when threads are still running, and clarify import/profiling intent. --- dlclivegui/gui/main_window.py | 2 +- dlclivegui/gui/recording_manager.py | 2 +- dlclivegui/services/dlc_processor.py | 25 +++++++++++++------------ 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index df99e83..cf4ac41 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -1378,7 +1378,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None: dlc_cam_id = selected_id else: dlc_cam_id = available_ids[0] if available_ids else "" - if dlc_cam_id is not None: + if dlc_cam_id: self._inference_camera_id = dlc_cam_id self._set_dlc_combo_to_id(dlc_cam_id) self.statusBar().showMessage( diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/gui/recording_manager.py index 22491c6..49ac993 100644 --- a/dlclivegui/gui/recording_manager.py +++ b/dlclivegui/gui/recording_manager.py @@ -139,7 +139,7 @@ def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None = if not rec or not rec.is_running: return try: - rec.write(frame, timestamp=timestamp or time.time()) + rec.write(frame, timestamp=timestamp if timestamp is not None else time.time()) except Exception as exc: log.warning("Failed to write frame for %s: %s", cam_id, exc) try: diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index fecad15..f70d599 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,6 +14,7 @@ from typing import Any import numpy as np +from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -22,17 +23,9 @@ logger = logging.getLogger(__name__) -# Enable profiling +# Enable profiling to get more detailed timing metrics for debugging and optimization. ENABLE_PROFILING = True -try: # pragma: no cover - optional dependency - from dlclive import ( - DLCLive, # type: ignore - ) -except Exception as e: # pragma: no cover - handled gracefully - logger.error(f"dlclive package could not be imported: {e}") - DLCLive = None # type: ignore[assignment] - class PoseBackends(Enum): DLC_LIVE = auto() @@ -169,7 +162,12 @@ def configure(self, settings: DLCProcessorSettings, processor: Any | None = None def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + logger.warning( + "Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues." + ) + return self._dlc = None self._initialized = False with self._stats_lock: @@ -275,17 +273,20 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: def _stop_worker(self) -> None: if self._worker_thread is None: - return + return True self._stop_event.set() - # Just wait for the timed get() loop to observe the flag and drain + # Wait for timed get() loop to observe the flag and drain self._worker_thread.join(timeout=2.0) if self._worker_thread.is_alive(): logger.warning("DLC worker thread did not terminate cleanly") + # IMPORTANT: do not clear references; thread may still be using them + return False self._worker_thread = None self._queue = None + return True @contextmanager def _timed_processor(self): From ec86f280e48d6a94b23a5061fac1517776927396 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 10:58:37 +0100 Subject: [PATCH 02/22] Add WorkerState enum for worker lifecycle Introduce a WorkerState enum and import Enum/auto in dlc_processor.py to represent worker lifecycle states (STOPPED, STARTING, RUNNING, STOPPING, FAULTED). This centralizes state representation for processing workers and improves clarity for future state management. --- dlclivegui/services/dlc_processor.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index f70d599..4aea239 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -29,6 +29,12 @@ class PoseBackends(Enum): DLC_LIVE = auto() +class WorkerState(Enum): + STOPPED = auto() + STARTING = auto() + RUNNING = auto() + STOPPING = auto() + FAULTED = auto() @dataclass From cad824c061e1a8d138f26015c5f263b904e35a6e Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:18:17 +0100 Subject: [PATCH 03/22] Improve DLCLive processor lifecycle and safety Add explicit worker lifecycle management and synchronization to DLCLive processor: introduce WorkerState, a lifecycle Lock, and stop/start state transitions. Make _start/_stop behavior safer by snapshotting the queue, returning boolean on stop, joining the thread with fault handling, and avoiding races by using a local queue reference in the worker loop. Harden enqueue_frame to respect lifecycle/state/stop flags and reduce copies by reusing a single frame copy and enqueuing enqueue timestamp. Improve DLCLive initialization error handling (set FAULTED on failure) and ensure caller-visible errors when DLCLive is not initialized. Prevent configuring the processor while running and add logging/warnings for shutdown/termination issues. Minor cleanup: adjust where initialized/state flags are set and remove redundant checks. --- dlclivegui/services/dlc_processor.py | 107 ++++++++++++++++++--------- 1 file changed, 74 insertions(+), 33 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 4aea239..971202d 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -29,6 +29,8 @@ class PoseBackends(Enum): DLC_LIVE = auto() + + class WorkerState(Enum): STOPPED = auto() STARTING = auto() @@ -134,11 +136,15 @@ class DLCLiveProcessor(QObject): def __init__(self) -> None: super().__init__() + # DLCLive instance and config self._settings = DLCProcessorSettings() self._dlc: Any | None = None self._processor: Any | None = None + # Worker thread and queue self._queue: queue.Queue[Any] | None = None self._worker_thread: threading.Thread | None = None + self._state = WorkerState.STOPPED + self._lifecycle_lock = threading.Lock() self._stop_event = threading.Event() self._initialized = False @@ -190,22 +196,29 @@ def reset(self) -> None: self._processor_overhead_times.clear() def shutdown(self) -> None: - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + logger.warning( + "Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released." + ) + return self._dlc = None self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: - # Start worker on first frame - if self._worker_thread is None: - self._start_worker(frame.copy(), timestamp) - return + frame_c = frame.copy() + enq_time = time.perf_counter() + + with self._lifecycle_lock: + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + q = self._queue # snapshot under lock - # As long as worker and queue are ready, ALWAYS enqueue - if self._queue is None: + if q is None: return try: - self._queue.put_nowait((frame.copy(), timestamp, time.perf_counter())) + q.put_nowait((frame_c, timestamp, enq_time)) with self._stats_lock: self._frames_enqueued += 1 except queue.Full: @@ -263,12 +276,13 @@ def get_stats(self) -> ProcessorStats: avg_processor_overhead=avg_proc_overhead, ) - def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None: + # lifecycle_lock must already be held if self._worker_thread is not None and self._worker_thread.is_alive(): return - self._queue = queue.Queue(maxsize=1) self._stop_event.clear() + self._state = WorkerState.STARTING self._worker_thread = threading.Thread( target=self._worker_loop, args=(init_frame, init_timestamp), @@ -277,21 +291,31 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: ) self._worker_thread.start() - def _stop_worker(self) -> None: - if self._worker_thread is None: - return True - - self._stop_event.set() - - # Wait for timed get() loop to observe the flag and drain - self._worker_thread.join(timeout=2.0) - if self._worker_thread.is_alive(): - logger.warning("DLC worker thread did not terminate cleanly") - # IMPORTANT: do not clear references; thread may still be using them + def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + with self._lifecycle_lock: + self._start_worker_locked(init_frame, init_timestamp) + + def _stop_worker(self) -> bool: + with self._lifecycle_lock: + t = self._worker_thread + if t is None: + self._state = WorkerState.STOPPED + return True + self._state = WorkerState.STOPPING + self._stop_event.set() + + t.join(timeout=2.0) + if t.is_alive(): + qsize = self._queue.qsize() if self._queue is not None else -1 + logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED return False - self._worker_thread = None - self._queue = None + with self._lifecycle_lock: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED return True @contextmanager @@ -335,6 +359,8 @@ def _process_frame( Single source of truth for: inference -> (optional) processor timing -> signal emit -> stats. Updates: frames_processed, latency, processing timeline, profiling metrics. """ + if self._dlc is None: + raise RuntimeError("DLCLive instance is not initialized.") # Time GPU inference (and processor overhead when present) with self._timed_processor() as proc_holder: inference_start = time.perf_counter() @@ -384,8 +410,6 @@ def _process_frame( def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: try: # -------- Initialization (unchanged) -------- - if DLCLive is None: - raise RuntimeError("The 'dlclive' package is required for pose estimation.") if not self._settings.model_path: raise RuntimeError("No DLCLive model path configured.") @@ -410,7 +434,14 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: if self._settings.device is not None: options["device"] = self._settings.device - self._dlc = DLCLive(**options) + try: + self._dlc = DLCLive(**options) + except Exception as exc: + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + raise RuntimeError( + f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}" + ) from exc # First inference to initialize init_inference_start = time.perf_counter() @@ -423,6 +454,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self._initialized = True self.initialized.emit(True) + with self._lifecycle_lock: + self._state = WorkerState.RUNNING total_init_time = time.perf_counter() - init_start logger.info( @@ -442,14 +475,20 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.initialized.emit(False) return + q = ( + self._queue + ) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running. + if q is None: + logger.warning("Worker loop exiting because queue was unexpectedly None") + return # -------- Main processing loop: stop-flag + timed get + drain -------- # NOTE: We never exit early unless _stop_event is set. while True: # If stop requested, only exit when queue is empty if self._stop_event.is_set(): - if self._queue is not None: + if q is not None: try: - frame, ts, enq = self._queue.get_nowait() + frame, ts, enq = q.get_nowait() except queue.Empty: # NOW it is safe to exit break @@ -462,7 +501,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass continue # check stop_event again WITHOUT breaking @@ -470,7 +509,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: # Normal operation: timed get try: wait_start = time.perf_counter() - item = self._queue.get(timeout=0.05) + item = q.get(timeout=0.05) queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue @@ -483,7 +522,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass @@ -520,6 +559,10 @@ def enqueue(self, frame, ts): self._proc.enqueue_frame(frame, ts) def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, selected_key) -> bool: + with self._proc._lifecycle_lock: + if self._proc._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") + processor = None if selected_key is not None and scanned_processors: try: @@ -533,11 +576,9 @@ def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, se def start(self): self._proc.reset() self.active = True - self.initialized = False def stop(self): self.active = False - self.initialized = False self._proc.reset() self._last_pose = None From d5ea95aa1a2cbd706e34abf748fef7542c7adf7d Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:24:12 +0100 Subject: [PATCH 04/22] Restart worker if dead and clear stop event When queuing work, ensure the background worker is started if the worker thread is None or not alive by calling _start_worker_locked(frame_c, timestamp). Also clear the _stop_event in both the early-return (no thread) and final cleanup paths when stopping so a stale stop flag doesn't prevent future restarts. --- dlclivegui/services/dlc_processor.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 971202d..ec553f9 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -212,6 +212,11 @@ def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return + t = self._worker_thread + if t is None or not t.is_alive(): + self._start_worker_locked(frame_c, timestamp) + return + q = self._queue # snapshot under lock if q is None: @@ -300,6 +305,7 @@ def _stop_worker(self) -> bool: t = self._worker_thread if t is None: self._state = WorkerState.STOPPED + self._stop_event.clear() return True self._state = WorkerState.STOPPING self._stop_event.set() @@ -316,6 +322,7 @@ def _stop_worker(self) -> bool: self._worker_thread = None self._queue = None self._state = WorkerState.STOPPED + self._stop_event.clear() return True @contextmanager From a4d21f7b0bbde9fde14f1161ef36aa58c5db7609 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:30:47 +0100 Subject: [PATCH 05/22] Mark worker FAULTED on queue errors Improve error handling in the worker loop when the queue is missing or queue.get raises unexpected exceptions. When q is None the code now logs a clearer message, sets the worker state to WorkerState.FAULTED under the lifecycle lock, and emits an error signal. Also adds a broad except around queue.get to log the exception, set FAULTED state, emit the error message, and break the loop to avoid silent failures. --- dlclivegui/services/dlc_processor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index ec553f9..8c541b2 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -486,8 +486,12 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self._queue ) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running. if q is None: - logger.warning("Worker loop exiting because queue was unexpectedly None") + logger.warning("Worker started without a queue; exiting") + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit("Worker started without a queue") return + # -------- Main processing loop: stop-flag + timed get + drain -------- # NOTE: We never exit early unless _stop_event is set. while True: @@ -520,6 +524,12 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue + except Exception as exc: + logger.exception("Error getting item from queue", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit(str(exc)) + break try: frame, ts, enq = item From 76d9a30cc37091b2b3c5de9a04627a8437c94a2f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:31:04 +0100 Subject: [PATCH 06/22] Force-terminate frozen camera threads on stop Make multi-camera shutdown more robust by iterating threads with their camera IDs, skipping non-running threads, and quitting each thread. If a thread fails to stop within 5000ms, log an error and call terminate() then wait an additional 1000ms to ensure it stops. Also simplify the start() docstring. --- dlclivegui/services/multi_camera_controller.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 8db469f..b579c86 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -143,7 +143,7 @@ def get_active_count(self) -> int: return len(self._started_cameras) def start(self, camera_settings: list[CameraSettings]) -> None: - """Start multiple cameras; accepts dataclasses, pydantic models, or dicts.""" + """Start multiple cameras.""" if self._running: LOGGER.warning("Multi-camera controller already running") return @@ -201,10 +201,15 @@ def stop(self, wait: bool = True) -> None: # Wait for threads to finish if wait: - for thread in self._threads.values(): - if thread.isRunning(): - thread.quit() - thread.wait(5000) + for cam_id, thread in list(self._threads.items()): + if not thread.isRunning(): + continue + + thread.quit() + if not thread.wait(5000): + LOGGER.error("Frozen camera thread %s; Forcing terminate()", cam_id) + thread.terminate() + thread.wait(1000) self._workers.clear() self._threads.clear() From 16b0d2a83b4cab6d5fa15f621fafbd801142ff50 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:32:37 +0100 Subject: [PATCH 07/22] Count unique cameras when starting multi-camera Previously _expected_cameras was set to len(active_settings), which could overcount when multiple CameraSettings referred to the same physical camera. This change computes a set of unique camera IDs (via get_camera_id) and sets _expected_cameras to its length, ensuring duplicates aren't double-counted. --- dlclivegui/services/multi_camera_controller.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index b579c86..1375ba2 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -158,7 +158,9 @@ def start(self, camera_settings: list[CameraSettings]) -> None: self._timestamps.clear() self._started_cameras.clear() self._failed_cameras.clear() - self._expected_cameras = len(active_settings) + + unique_ids = {get_camera_id(s) for s in active_settings} + self._expected_cameras = len(unique_ids) for settings in active_settings: self._start_camera(settings) From ae1754cc034fe248d7ead3042e91f154ea835e69 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:42:20 +0100 Subject: [PATCH 08/22] Improve VideoRecorder shutdown and writer loop Add robust shutdown and error handling for the video recorder. Prevent writes after stop by checking the stop event early; record and raise encoding errors as before. Refactor stop() to use local vars, signal the writer via a sentinel, join the writer thread safely, and avoid saving timestamps prematurely. Rework _writer_loop to handle a missing queue, broaden exception handling when pulling/writing items, ensure q.task_done() is always called, properly handle the sentinel, update encoding stats and logging, and perform final cleanup (save timestamps, clear queue and thread) when exiting. --- dlclivegui/services/video_recorder.py | 116 ++++++++++++++++++-------- 1 file changed, 80 insertions(+), 36 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 1addbca..7a6967c 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -118,11 +118,13 @@ def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None self._frame_rate = frame_rate def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: - if not self.is_running or self._queue is None: - return False error = self._current_error() if error is not None: raise RuntimeError(f"Video encoding failed: {error}") from error + if not self.is_running or self._queue is None: + return False + if self._stop_event.is_set(): + return False # Capture timestamp now, but only record it if frame is successfully enqueued if timestamp is None: @@ -181,25 +183,32 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: if self._writer is None and not self.is_running: return + self._stop_event.set() - if self._queue is not None: + + q = self._queue + if q is not None: try: - self._queue.put_nowait(_SENTINEL) + q.put_nowait(_SENTINEL) except queue.Full: pass - # self._queue.put(_SENTINEL) - if self._writer_thread is not None: - self._writer_thread.join(timeout=5.0) - if self._writer_thread.is_alive(): + + t = self._writer_thread + if t is not None: + t.join(timeout=5.0) + if t.is_alive(): logger.warning("Video recorder thread did not terminate cleanly") + return + if self._writer is not None: try: self._writer.close() except Exception: logger.exception("Failed to close WriteGear cleanly") - # Save timestamps to JSON file - self._save_timestamps() + if self._writer_thread is None: + # Save timestamps to JSON file + self._save_timestamps() self._writer = None self._writer_thread = None @@ -236,45 +245,80 @@ def get_stats(self) -> RecorderStats | None: ) def _writer_loop(self) -> None: - assert self._queue is not None + q = self._queue + if q is None: + with self._stats_lock: + self._encode_error = RuntimeError("Writer loop started without a queue") + logger.error("Writer loop started without a queue; exiting") + return + try: while True: try: - item = self._queue.get(timeout=0.1) + item = q.get(timeout=0.1) except queue.Empty: if self._stop_event.is_set(): break continue - if item is _SENTINEL: - self._queue.task_done() - break - frame, timestamp = item - start = time.perf_counter() - try: - assert self._writer is not None - self._writer.write(frame) - except OSError as exc: + except Exception as exc: with self._stats_lock: self._encode_error = exc - logger.exception("Video encoding failed while writing frame") - self._queue.task_done() + logger.exception("Could not retrieve item from queue", exc_info=exc) self._stop_event.set() break - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._queue.qsize() - self._last_log_time = now - self._queue.task_done() + + stop_now = False + try: + if item is _SENTINEL: + stop_now = True + continue + + frame, timestamp = item + start = time.perf_counter() + + try: + writer = self._writer + if writer is None: + raise RuntimeError("WriteGear writer is not initialized") + writer.write(frame) + except Exception as exc: # <- broader than OSError + with self._stats_lock: + self._encode_error = exc + logger.exception("Video encoding failed while writing frame", exc_info=exc) + self._stop_event.set() + stop_now = True + continue + + elapsed = time.perf_counter() - start + now = time.perf_counter() + with self._stats_lock: + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now + + finally: + # Ensure queue accounting is correct for every item pulled from q + try: + q.task_done() + except ValueError: + pass + + if stop_now: + break + finally: self._finalize_writer() + self._save_timestamps() + + # Safe cleanup only once the thread is actually exiting + self._queue = None + if self._writer_thread is threading.current_thread(): + self._writer_thread = None def _finalize_writer(self) -> None: writer = self._writer From 29f26040777ee4fbe74c3b456198027c0c94beb5 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:46:00 +0100 Subject: [PATCH 09/22] Refactor writer thread error handling and stats Consolidate the writer-thread loop and tighten up exception handling. Removed the conditional _save_timestamps path during teardown. In the writer loop sentinel handling and frame processing are unified; on write errors the exception is stored under _stats_lock, logged, and stop_event/stop_now are set. The stats updates (frames_written, total_latency, last_latency, written_times, frame_timestamps and periodic FPS computation) were moved into the same control flow to ensure consistent accounting and termination behavior. --- dlclivegui/services/video_recorder.py | 58 ++++++++++++--------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 7a6967c..91d400a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -206,10 +206,6 @@ def stop(self) -> None: except Exception: logger.exception("Failed to close WriteGear cleanly") - if self._writer_thread is None: - # Save timestamps to JSON file - self._save_timestamps() - self._writer = None self._writer_thread = None self._queue = None @@ -271,35 +267,33 @@ def _writer_loop(self) -> None: try: if item is _SENTINEL: stop_now = True - continue - - frame, timestamp = item - start = time.perf_counter() - - try: - writer = self._writer - if writer is None: - raise RuntimeError("WriteGear writer is not initialized") - writer.write(frame) - except Exception as exc: # <- broader than OSError + else: + frame, timestamp = item + start = time.perf_counter() + + try: + writer = self._writer + if writer is None: + raise RuntimeError("WriteGear writer is not initialized") + writer.write(frame) + except Exception as exc: + with self._stats_lock: + self._encode_error = exc + logger.exception("Video encoding failed while writing frame", exc_info=exc) + self._stop_event.set() + stop_now = True + + elapsed = time.perf_counter() - start + now = time.perf_counter() with self._stats_lock: - self._encode_error = exc - logger.exception("Video encoding failed while writing frame", exc_info=exc) - self._stop_event.set() - stop_now = True - continue - - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._last_log_time = now + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now finally: # Ensure queue accounting is correct for every item pulled from q From 64f820e478b5ae4164cc9d426ab83e212ffca4ae Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 11:57:30 +0100 Subject: [PATCH 10/22] Revert DLCLive imports due to unguarded torch --- dlclivegui/services/dlc_processor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 8c541b2..d361edd 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,7 +14,8 @@ from typing import Any import numpy as np -from dlclive import DLCLive + +# from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -23,6 +24,15 @@ logger = logging.getLogger(__name__) +try: # pragma: no cover - optional dependency + from dlclive import ( + DLCLive, # type: ignore + ) +except Exception as e: # pragma: no cover - handled gracefully + logger.error(f"dlclive package could not be imported: {e}") + DLCLive = None # type: ignore[assignment] + + # Enable profiling to get more detailed timing metrics for debugging and optimization. ENABLE_PROFILING = True From 9b6e1c824fc67d9c1e00e2d35ac6ba85409543e7 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 13:48:54 +0100 Subject: [PATCH 11/22] Improve camera thread cleanup and recorder finalization Add cooperative shutdown timeouts and robust cleanup for camera threads: introduce QUIT_WAIT_MS and TERMINATE_WAIT_MS, connect thread.finished to a new _cleanup_camera that removes frame data and deletes worker/thread QObjects, and enhance stop() to wait, terminate, and log/retain references if threads refuse to terminate (avoids use-after-free/segfaults). Adjust per-camera shutdown checks and ensure frames/timestamps are cleared only after safe termination. In video_recorder, remove premature clearing of the queue and _writer_thread during finalization to avoid unsafe cleanup while writer thread may still be exiting. --- .../services/multi_camera_controller.py | 73 +++++++++++++++---- dlclivegui/services/video_recorder.py | 5 -- 2 files changed, 57 insertions(+), 21 deletions(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 1375ba2..16b6ccb 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -5,6 +5,7 @@ import logging import time from dataclasses import dataclass +from functools import partial from threading import Event, Lock import cv2 @@ -20,6 +21,9 @@ LOGGER = logging.getLogger(__name__) +QUIT_WAIT_MS = 5000 # wait for cooperative quit (5s) +TERMINATE_WAIT_MS = 1000 # wait after terminate (1s) + @dataclass class MultiFrameData: @@ -188,8 +192,25 @@ def _start_camera(self, settings: CameraSettings) -> None: self._workers[cam_id] = worker self._threads[cam_id] = thread + thread.finished.connect(partial(self._cleanup_camera, cam_id)) + worker.stopped.connect(thread.quit) thread.start() + def _cleanup_camera(self, camera_id: str) -> None: + # remove stored frame data + with self._frame_lock: + self._frames.pop(camera_id, None) + self._timestamps.pop(camera_id, None) + + worker = self._workers.pop(camera_id, None) + thread = self._threads.pop(camera_id, None) + self._settings.pop(camera_id, None) + + if worker is not None: + worker.deleteLater() + if thread is not None: + thread.deleteLater() + def stop(self, wait: bool = True) -> None: """Stop all cameras.""" if not self._running: @@ -203,22 +224,47 @@ def stop(self, wait: bool = True) -> None: # Wait for threads to finish if wait: + still_running: list[str] = [] for cam_id, thread in list(self._threads.items()): + if thread is None: + continue if not thread.isRunning(): continue thread.quit() - if not thread.wait(5000): - LOGGER.error("Frozen camera thread %s; Forcing terminate()", cam_id) - thread.terminate() - thread.wait(1000) - - self._workers.clear() - self._threads.clear() - self._settings.clear() + if thread.wait(QUIT_WAIT_MS): + continue # Clean exit + + LOGGER.error( + "Camera thread %s did not quit within %dms; forcing terminate()", + cam_id, + QUIT_WAIT_MS, + ) + + thread.terminate() + if thread.wait(TERMINATE_WAIT_MS): + continue # Terminated successfully + + LOGGER.critical( + "Camera thread %s refused to terminate after terminate()+wait(%dms). " + "Keeping references to avoid use-after-free/segfaults. " + "Application restart may be required.", + cam_id, + TERMINATE_WAIT_MS, + ) + still_running.append(cam_id) + + if still_running: + self._started_cameras.clear() + return + self._started_cameras.clear() self._failed_cameras.clear() + with self._frame_lock: + self._frames.clear() + self._timestamps.clear() self._expected_cameras = 0 + self.all_stopped.emit() def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float) -> None: @@ -437,14 +483,9 @@ def _on_camera_stopped(self, camera_id: str) -> None: # Cleanup thread if camera_id in self._threads: - thread = self._threads[camera_id] - if thread.isRunning(): + thread = self._threads.get(camera_id) + if thread is not None and thread.isRunning(): thread.quit() - thread.wait(1000) - del self._threads[camera_id] - - if camera_id in self._workers: - del self._workers[camera_id] # Remove frame data with self._frame_lock: @@ -463,7 +504,7 @@ def _on_camera_stopped(self, camera_id: str) -> None: return # Check if all running cameras have stopped (normal shutdown) - if not self._started_cameras and self._running and not self._workers: + if not self._started_cameras and all(not t.isRunning() for t in self._threads.values() if t is not None): self._running = False self.all_stopped.emit() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 91d400a..e376c56 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -309,11 +309,6 @@ def _writer_loop(self) -> None: self._finalize_writer() self._save_timestamps() - # Safe cleanup only once the thread is actually exiting - self._queue = None - if self._writer_thread is threading.current_thread(): - self._writer_thread = None - def _finalize_writer(self) -> None: writer = self._writer self._writer = None From acd15da21cde94733256fbd884e281c1a11be53f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 13:49:58 +0100 Subject: [PATCH 12/22] Update dlc_processor.py --- dlclivegui/services/dlc_processor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index d361edd..29d6603 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -452,6 +452,10 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: options["device"] = self._settings.device try: + if DLCLive is None: + raise RuntimeError( + "DLCLive class is not available. Ensure the dlclive package is installed and can be imported." + ) self._dlc = DLCLive(**options) except Exception as exc: with self._lifecycle_lock: From 369908e83540ac8d303a495b392929fad1f7589c Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:02:25 +0100 Subject: [PATCH 13/22] Add lifecycle management & robust writer shutdown Introduce lifecycle locking and an "abandoned" state to VideoRecorder to prevent unsafe restarts and improve shutdown handling. start() now acquires a lifecycle lock, refuses to restart when a leftover thread is marked abandoned, and initializes/reset writer state inside the lock. stop() now marks the recorder as abandoned and records an encode error if the writer thread fails to terminate within the timeout, and ensures timestamps are saved. The writer loop was simplified to break directly on sentinel or encode errors and now closes WriteGear during finalization with exception handling. Miscellaneous reordering and small bookkeeping fixes were made to improve safety and logging during shutdown. --- dlclivegui/services/video_recorder.py | 132 +++++++++++++++----------- 1 file changed, 79 insertions(+), 53 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index e376c56..0ae735a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -53,6 +53,7 @@ def __init__( crf: int = 23, buffer_size: int = 240, ): + # Config self._output = Path(output) self._writer: Any | None = None self._frame_size = frame_size @@ -60,10 +61,14 @@ def __init__( self._codec = codec self._crf = int(crf) self._buffer_size = max(1, int(buffer_size)) + # Worker state self._queue: queue.Queue[Any] | None = None self._writer_thread: threading.Thread | None = None self._stop_event = threading.Event() self._stats_lock = threading.Lock() + self._lifecycle_lock = threading.Lock() + self._abandoned = False + # Stats self._frames_enqueued = 0 self._frames_written = 0 self._dropped_frames = 0 @@ -81,37 +86,46 @@ def is_running(self) -> bool: def start(self) -> None: if WriteGear is None: raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.") - if self._writer is not None: - return - fps_value = float(self._frame_rate) if self._frame_rate else 30.0 - - writer_kwargs: dict[str, Any] = { - "compression_mode": True, - "logging": False, - "-input_framerate": fps_value, - "-vcodec": (self._codec or "libx264").strip() or "libx264", - "-crf": int(self._crf), - } - # TODO deal with pixel format - - self._output.parent.mkdir(parents=True, exist_ok=True) - self._writer = WriteGear(output=str(self._output), **writer_kwargs) - self._queue = queue.Queue(maxsize=self._buffer_size) - self._frames_enqueued = 0 - self._frames_written = 0 - self._dropped_frames = 0 - self._total_latency = 0.0 - self._last_latency = 0.0 - self._written_times.clear() - self._frame_timestamps.clear() - self._encode_error = None - self._stop_event.clear() - self._writer_thread = threading.Thread( - target=self._writer_loop, - name="VideoRecorderWriter", - daemon=True, - ) - self._writer_thread.start() + + with self._lifecycle_lock: + if self.is_running: + return + if self._abandoned: + raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") + if self._writer is not None: + self._writer = None + self._queue = None + self._writer_thread = None + + fps_value = float(self._frame_rate) if self._frame_rate else 30.0 + + writer_kwargs: dict[str, Any] = { + "compression_mode": True, + "logging": False, + "-input_framerate": fps_value, + "-vcodec": (self._codec or "libx264").strip() or "libx264", + "-crf": int(self._crf), + } + # TODO deal with pixel format + + self._output.parent.mkdir(parents=True, exist_ok=True) + self._writer = WriteGear(output=str(self._output), **writer_kwargs) + self._queue = queue.Queue(maxsize=self._buffer_size) + self._frames_enqueued = 0 + self._frames_written = 0 + self._dropped_frames = 0 + self._total_latency = 0.0 + self._last_latency = 0.0 + self._written_times.clear() + self._frame_timestamps.clear() + self._encode_error = None + self._stop_event.clear() + self._writer_thread = threading.Thread( + target=self._writer_loop, + name="VideoRecorderWriter", + daemon=True, + ) + self._writer_thread.start() def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None) -> None: self._frame_size = frame_size @@ -197,8 +211,16 @@ def stop(self) -> None: if t is not None: t.join(timeout=5.0) if t.is_alive(): - logger.warning("Video recorder thread did not terminate cleanly") - return + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + self._abandoned = True + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return if self._writer is not None: try: @@ -206,9 +228,13 @@ def stop(self) -> None: except Exception: logger.exception("Failed to close WriteGear cleanly") + self._save_timestamps() + self._writer = None self._writer_thread = None self._queue = None + self._stop_event.clear() + self._abandoned = False def get_stats(self) -> RecorderStats | None: if ( @@ -263,10 +289,9 @@ def _writer_loop(self) -> None: self._stop_event.set() break - stop_now = False try: if item is _SENTINEL: - stop_now = True + break else: frame, timestamp = item start = time.perf_counter() @@ -281,19 +306,19 @@ def _writer_loop(self) -> None: self._encode_error = exc logger.exception("Video encoding failed while writing frame", exc_info=exc) self._stop_event.set() - stop_now = True - - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._last_log_time = now + break + else: + elapsed = time.perf_counter() - start + now = time.perf_counter() + with self._stats_lock: + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now finally: # Ensure queue accounting is correct for every item pulled from q @@ -302,12 +327,13 @@ def _writer_loop(self) -> None: except ValueError: pass - if stop_now: - break - finally: - self._finalize_writer() - self._save_timestamps() + writer = self._writer + if writer is not None: + try: + writer.close() + except Exception: + logger.exception("Failed to close WriteGear during writer loop finalization") def _finalize_writer(self) -> None: writer = self._writer From 91dff83aed06272187321e9b67b0deb8223f3f93 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:33:41 +0100 Subject: [PATCH 14/22] Improve lifecycle handling and stale-writer cleanup dlclivegui/services/dlc_processor.py: Move the frame copy and enqueue timestamp into the lifecycle lock to avoid unnecessary copying when the processor is stopped; on initialization errors set WorkerState.FAULTED under the lifecycle lock before emitting the error. dlclivegui/services/multi_camera_controller.py: Reformat the running-cameras shutdown check into a multi-line condition (keeps the same intent of detecting normal shutdown when no cameras are started and all threads are stopped). dlclivegui/services/video_recorder.py: Add best-effort cleanup for a stale video writer in start() by attempting to call its close() and logging any errors before clearing writer/queue/thread references. Wrap stop() logic with the lifecycle lock to avoid races when stopping, preserve timeout/abandon behavior for a non-terminating writer thread, and ensure writer closure and timestamp saving are handled with proper error logging. --- dlclivegui/services/dlc_processor.py | 8 +- .../services/multi_camera_controller.py | 6 +- dlclivegui/services/video_recorder.py | 89 +++++++++++-------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index 29d6603..e7aaaf2 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -216,17 +216,15 @@ def shutdown(self) -> None: self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: - frame_c = frame.copy() - enq_time = time.perf_counter() - with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return + frame_c = frame.copy() + enq_time = time.perf_counter() t = self._worker_thread if t is None or not t.is_alive(): self._start_worker_locked(frame_c, timestamp) return - q = self._queue # snapshot under lock if q is None: @@ -492,6 +490,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: except Exception as exc: logger.exception("Failed to initialize DLCLive", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED self.error.emit(str(exc)) self.initialized.emit(False) return diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 16b6ccb..4ecf8a0 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -504,7 +504,11 @@ def _on_camera_stopped(self, camera_id: str) -> None: return # Check if all running cameras have stopped (normal shutdown) - if not self._started_cameras and all(not t.isRunning() for t in self._threads.values() if t is not None): + if ( + not self._started_cameras + and not self._started_cameras + and all(not t.isRunning() for t in self._threads.values() if t is not None) + ): self._running = False self.all_stopped.emit() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 0ae735a..2552a8a 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -93,9 +93,21 @@ def start(self) -> None: if self._abandoned: raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") if self._writer is not None: - self._writer = None - self._queue = None - self._writer_thread = None + # Best-effort cleanup of a stale writer to avoid leaking resources. + logger.warning( + "VideoRecorder.start() found an existing writer while not running; " + "attempting to close the stale writer before restarting." + ) + try: + close_method = getattr(self._writer, "close", None) + if callable(close_method): + close_method() + except Exception: + logger.exception("Error while closing stale video writer in start().") + finally: + self._writer = None + self._queue = None + self._writer_thread = None fps_value = float(self._frame_rate) if self._frame_rate else 30.0 @@ -195,46 +207,47 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return True def stop(self) -> None: - if self._writer is None and not self.is_running: - return + with self._lifecycle_lock: + if self._writer is None and not self.is_running: + return - self._stop_event.set() + self._stop_event.set() - q = self._queue - if q is not None: - try: - q.put_nowait(_SENTINEL) - except queue.Full: - pass - - t = self._writer_thread - if t is not None: - t.join(timeout=5.0) - if t.is_alive(): - with self._stats_lock: - self._encode_error = RuntimeError( - "Failed to stop VideoRecorder within timeout; thread is still alive." - ) - self._abandoned = True - logger.critical( - "Failed to stop VideoRecorder within timeout; thread is still alive. " - "Marking recorder as abandoned to prevent restart." - ) - return + q = self._queue + if q is not None: + try: + q.put_nowait(_SENTINEL) + except queue.Full: + pass + + t = self._writer_thread + if t is not None: + t.join(timeout=5.0) + if t.is_alive(): + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + self._abandoned = True + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return - if self._writer is not None: - try: - self._writer.close() - except Exception: - logger.exception("Failed to close WriteGear cleanly") + if self._writer is not None: + try: + self._writer.close() + except Exception: + logger.exception("Failed to close WriteGear cleanly") - self._save_timestamps() + self._save_timestamps() - self._writer = None - self._writer_thread = None - self._queue = None - self._stop_event.clear() - self._abandoned = False + self._writer = None + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False def get_stats(self) -> RecorderStats | None: if ( From 2b0a3593e1a3cc03f4400ebe45fb0c10f832e54f Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 14:58:09 +0100 Subject: [PATCH 15/22] Cleanup stale camera workers on start/shutdown Detect and remove stale camera worker/thread state when starting cameras and during shutdown. _start_camera now checks the existing thread for a cam_id and calls _cleanup_camera if the worker exists but the thread is missing or not running, and updates the log message to indicate the camera is already running. The shutdown loop likewise invokes _cleanup_camera for None or not-running threads and after successful quit/terminate waits to ensure worker/thread entries are cleared. These changes prevent stale entries from blocking restarts and ensure cleaner shutdown behavior. --- dlclivegui/services/multi_camera_controller.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 4ecf8a0..075673e 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -172,8 +172,12 @@ def start(self, camera_settings: list[CameraSettings]) -> None: def _start_camera(self, settings: CameraSettings) -> None: """Start a single camera.""" cam_id = get_camera_id(settings) + existing_thread = self._threads.get(cam_id) + if cam_id in self._workers and (existing_thread is None or not existing_thread.isRunning()): + LOGGER.warning(f"Stale camera worker/thread found for {cam_id}, cleaning up") + self._cleanup_camera(cam_id) if cam_id in self._workers: - LOGGER.warning(f"Camera {cam_id} already has a worker") + LOGGER.warning(f"Camera {cam_id} is already running, skipping start") return # Normalize and store the dataclass once @@ -227,12 +231,15 @@ def stop(self, wait: bool = True) -> None: still_running: list[str] = [] for cam_id, thread in list(self._threads.items()): if thread is None: + self._cleanup_camera(cam_id) continue if not thread.isRunning(): + self._cleanup_camera(cam_id) continue thread.quit() if thread.wait(QUIT_WAIT_MS): + self._cleanup_camera(cam_id) continue # Clean exit LOGGER.error( @@ -243,6 +250,7 @@ def stop(self, wait: bool = True) -> None: thread.terminate() if thread.wait(TERMINATE_WAIT_MS): + self._cleanup_camera(cam_id) continue # Terminated successfully LOGGER.critical( From d0999a783fd717d9c1e048aa41d5f809a69333b0 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 15:04:33 +0100 Subject: [PATCH 16/22] Make stop join timeout configurable and add test Introduce STOP_JOIN_TIMEOUT (5s) and use it for the writer thread join timeout instead of a hardcoded value. Reorder lifecycle checks in VideoRecorder.start to raise if the recorder is marked _abandoned before short-circuiting on is_running, preventing accidental restarts of abandoned recorders. Add a BlockingWriteGear fake, fixture, and a test (test_stop_timeout_marks_abandoned_and_prevents_restart) that forces a stop timeout, verifies the recorder is marked abandoned, prevents restart, and performs cleanup of the blocked writer thread. --- dlclivegui/services/video_recorder.py | 8 +-- tests/services/test_video_recorder.py | 72 +++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 3 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 2552a8a..74a33f9 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -23,6 +23,8 @@ logger = logging.getLogger(__name__) +STOP_JOIN_TIMEOUT = 5.0 # seconds + @dataclass class RecorderStats: @@ -88,10 +90,10 @@ def start(self) -> None: raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.") with self._lifecycle_lock: - if self.is_running: - return if self._abandoned: raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") + if self.is_running: + return if self._writer is not None: # Best-effort cleanup of a stale writer to avoid leaking resources. logger.warning( @@ -222,7 +224,7 @@ def stop(self) -> None: t = self._writer_thread if t is not None: - t.join(timeout=5.0) + t.join(timeout=STOP_JOIN_TIMEOUT) if t.is_alive(): with self._stats_lock: self._encode_error = RuntimeError( diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 7665ae3..1ab0c10 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import threading import time from pathlib import Path @@ -75,6 +76,38 @@ def gray_frame(): return np.zeros((48, 64), dtype=np.uint8) +class BlockingWriteGear(FakeWriteGear): + """ + Fake WriteGear that blocks inside write() to simulate a hung encoder/IO stall. + """ + + instances = [] + + def __init__(self, output: str, **kwargs): + super().__init__(output, **kwargs) + self.entered_write = threading.Event() + self.release_write = threading.Event() + BlockingWriteGear.instances.append(self) + + def write(self, frame): + # Signal that the writer thread is now stuck inside write() + self.entered_write.set() + # Block until the test releases us (long timeout as safety, but test releases explicitly) + self.release_write.wait(timeout=60.0) + # If released, behave like normal FakeWriteGear + return super().write(frame) + + +@pytest.fixture +def patch_blocking_writegear(monkeypatch): + """ + Patch module-level WriteGear to BlockingWriteGear for the hung-thread test. + """ + BlockingWriteGear.instances.clear() + monkeypatch.setattr(vr_mod, "WriteGear", BlockingWriteGear) + return BlockingWriteGear + + # ---------------------------- # Tests # ---------------------------- @@ -295,3 +328,42 @@ def test_overlay_frame_size_mismatch_still_detected(patch_writegear, output_path rec.write(np.zeros((48, 64, 3), dtype=np.uint8), timestamp=2.0) rec.stop() + + +def test_stop_timeout_marks_abandoned_and_prevents_restart( + patch_blocking_writegear, monkeypatch, output_path, rgb_frame +): + # Make stop timeout tiny so the test is fast. + monkeypatch.setattr(vr_mod, "STOP_JOIN_TIMEOUT", 0.01) + + rec = vr_mod.VideoRecorder(output_path, frame_size=(48, 64), buffer_size=10) + rec.start() + assert rec.is_running is True + + # Enqueue one frame so writer thread enters BlockingWriteGear.write() and blocks. + assert rec.write(rgb_frame, timestamp=1.0) is True + + wg = patch_blocking_writegear.instances[0] + wait_until(lambda: wg.entered_write.is_set(), timeout=1.0) + + # Now stop: should time out and mark abandoned + rec.stop() + + assert rec._abandoned is True + err = rec._current_error() + assert err is not None + assert "Failed to stop VideoRecorder within timeout" in str(err) + + # Thread should still be alive (blocked in write) + assert rec.is_running is True + + # Restart should be prevented while abandoned + with pytest.raises(RuntimeError, match="Cannot restart VideoRecorder"): + rec.start() + + # ---- Cleanup to avoid leaking a blocked daemon thread into other tests ---- + wg.release_write.set() # let write() return + wait_until(lambda: not rec.is_running, timeout=2.0) + + # Call stop again to clear resources / reset flags (now it can join cleanly) + rec.stop() From 5c06b5c759e6dcfefd6aecedd75c888d683656d7 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 15:42:42 +0100 Subject: [PATCH 17/22] Fix locking, camera check, and writer finalization Minimize time holding the DLCLiveProcessor lifecycle lock by moving expensive operations (frame.copy and timestamp handling) outside the lock and re-checking state before starting the worker to avoid races. Correct a duplicated condition in MultiCameraController so normal shutdown is detected using self._running. Move writer cleanup into a dedicated _finalize_writer(), add a warning when Queue.task_done() is called too many times, and centralize writer close/exception handling. --- dlclivegui/services/dlc_processor.py | 28 +++++++++++++------ .../services/multi_camera_controller.py | 2 +- dlclivegui/services/video_recorder.py | 8 ++---- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index e7aaaf2..cbae5a7 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -14,8 +14,6 @@ from typing import Any import numpy as np - -# from dlclive import DLCLive from PySide6.QtCore import QObject, Signal from dlclivegui.config import DLCProcessorSettings, ModelType @@ -216,16 +214,30 @@ def shutdown(self) -> None: self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: + # Keep lifecycle lock held only for quick state checks and snapshots. with self._lifecycle_lock: if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): return - frame_c = frame.copy() - enq_time = time.perf_counter() t = self._worker_thread - if t is None or not t.is_alive(): - self._start_worker_locked(frame_c, timestamp) - return - q = self._queue # snapshot under lock + q = self._queue + should_start = t is None or not t.is_alive() + + frame_c = frame.copy() + enq_time = time.perf_counter() + + if should_start: + # Re-acquire the lifecycle lock to safely (re)start the worker if needed. + with self._lifecycle_lock: + # Re-check state in case it changed while we were copying the frame. + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + t = self._worker_thread + if t is None or not t.is_alive(): + # _start_worker_locked expects the lifecycle lock to be held. + self._start_worker_locked(frame_c, timestamp) + return + # Worker is now running; refresh queue snapshot. + q = self._queue if q is None: return diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 075673e..e72d618 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -514,7 +514,7 @@ def _on_camera_stopped(self, camera_id: str) -> None: # Check if all running cameras have stopped (normal shutdown) if ( not self._started_cameras - and not self._started_cameras + and self._running and all(not t.isRunning() for t in self._threads.values() if t is not None) ): self._running = False diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 74a33f9..f346975 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -340,15 +340,11 @@ def _writer_loop(self) -> None: try: q.task_done() except ValueError: + logger.warning("Queue task_done() called too many times in writer loop") pass finally: - writer = self._writer - if writer is not None: - try: - writer.close() - except Exception: - logger.exception("Failed to close WriteGear during writer loop finalization") + self._finalize_writer() def _finalize_writer(self) -> None: writer = self._writer From b4e57d6cc85bbb78a5dd8c6e585e3331e4f665c2 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:17:49 +0100 Subject: [PATCH 18/22] Add background reaper for stalled DLC worker Introduce STOP_WORKER_TIMEOUT and a background reaper to handle worker threads that don't terminate within the stop timeout. Increase join timeout to STOP_WORKER_TIMEOUT, set _pending_reset when a reset is requested while the worker is alive, and schedule _schedule_reap to join the thread in the background instead of immediately marking the processor as FAULTED. The reaper performs final cleanup (clearing thread, queue, state, stop event) and applies a pending DLCLive reset once the thread is reaped, with guards to ensure only one reaper runs at a time. --- dlclivegui/services/dlc_processor.py | 41 ++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index cbae5a7..d18a766 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,6 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) +STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before marking as faulted try: # pragma: no cover - optional dependency from dlclive import ( @@ -155,6 +156,9 @@ def __init__(self) -> None: self._lifecycle_lock = threading.Lock() self._stop_event = threading.Event() self._initialized = False + ## Worker cleanup + self._reaping = False + self._pending_reset = False # Statistics tracking self._frames_enqueued = 0 @@ -184,6 +188,8 @@ def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" stopped = self._stop_worker() if not stopped: + with self._lifecycle_lock: + self._pending_reset = True logger.warning( "Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues." ) @@ -330,14 +336,14 @@ def _stop_worker(self) -> bool: self._state = WorkerState.STOPPING self._stop_event.set() - t.join(timeout=2.0) + t.join(timeout=STOP_WORKER_TIMEOUT) if t.is_alive(): qsize = self._queue.qsize() if self._queue is not None else -1 logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) - with self._lifecycle_lock: - self._state = WorkerState.FAULTED + self._schedule_reap(t) return False + # Normal cleanup with self._lifecycle_lock: self._worker_thread = None self._queue = None @@ -345,6 +351,35 @@ def _stop_worker(self) -> bool: self._stop_event.clear() return True + def _schedule_reap(self, t: threading.Thread) -> None: + with self._lifecycle_lock: + if self._reaping: + return + self._reaping = True + + # ensure only one reaper + def reap(): + try: + t.join() # wait without timeout in background + with self._lifecycle_lock: + # only clean if we're still stopping this thread + if self._worker_thread is t: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED + self._stop_event.clear() + + if self._pending_reset: + self._dlc = None + self._initialized = False + self._pending_reset = False + finally: + with self._lifecycle_lock: + self._reaping = False + logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again") + + threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start() + @contextmanager def _timed_processor(self): """ From 5ec62663699c181d89af982cef21cbbce0da2433 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:23:10 +0100 Subject: [PATCH 19/22] Cache _queue to local variable in VideoRecorder Cache self._queue to a local variable (q) and use it for subsequent operations to avoid races and repeated attribute access. Remove the redundant assert and simplify qsize lookup by relying on the cached reference. This makes queue checks and put/qsize calls consistent in a multithreaded context and prevents potential None/attribute changes between checks and use. --- dlclivegui/services/video_recorder.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index f346975..20f7453 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -149,7 +149,9 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: error = self._current_error() if error is not None: raise RuntimeError(f"Video encoding failed: {error}") from error - if not self.is_running or self._queue is None: + + q = self._queue + if not self.is_running or q is None: return False if self._stop_event.is_set(): return False @@ -192,12 +194,11 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return False try: - assert self._queue is not None - self._queue.put((frame, timestamp), block=False) + q.put((frame, timestamp), block=False) except queue.Full: with self._stats_lock: self._dropped_frames += 1 - queue_size = self._queue.qsize() if self._queue is not None else -1 + queue_size = q.qsize() logger.warning( "Video recorder queue full; dropping frame. queue=%d buffer=%d", queue_size, From 6be4047b34f528a0e1e995f3434c76c4e07d3508 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:25:39 +0100 Subject: [PATCH 20/22] Prevent configure while processor running Add a lifecycle lock and guard in DLCLiveProcessor.configure to ensure the processor is stopped before reconfiguration. The method now acquires self._lifecycle_lock and raises a RuntimeError if the processor state is not WorkerState.STOPPED, preventing concurrent/race-condition updates to settings or processor while running. --- dlclivegui/services/dlc_processor.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index d18a766..c099c3c 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -181,6 +181,9 @@ def get_model_backend(model_path: str) -> Engine: return Engine.from_model_path(model_path) def configure(self, settings: DLCProcessorSettings, processor: Any | None = None) -> None: + with self._lifecycle_lock: + if self._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") self._settings = settings self._processor = processor From 71bb1aa3ad3abf1a16d05f2a861f0b46afe7116b Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 16:35:18 +0100 Subject: [PATCH 21/22] Cleanup abandoned recorder and set pending reset Ensure proper cleanup and restartability when writer threads time out: VideoRecorder.stop now clears abandoned state and associated resources if the writer thread has already exited, allowing the recorder to be fully stopped and restarted. DLCLiveProcessor: moved settings/processor assignment under the lifecycle lock, and mark _pending_reset when shutdown cannot stop the worker thread so the instance can be reaped later. Minor comment tweak for STOP_WORKER_TIMEOUT semantics. Tests updated to assert abandoned is cleared and that recorder can be restarted after the writer exits. --- dlclivegui/services/dlc_processor.py | 8 +++++--- dlclivegui/services/video_recorder.py | 11 ++++++++++- tests/services/test_video_recorder.py | 5 +++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index c099c3c..46a3ac0 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,7 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) -STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before marking as faulted +STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before STOPPING state and reaping try: # pragma: no cover - optional dependency from dlclive import ( @@ -184,8 +184,8 @@ def configure(self, settings: DLCProcessorSettings, processor: Any | None = None with self._lifecycle_lock: if self._state != WorkerState.STOPPED: raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") - self._settings = settings - self._processor = processor + self._settings = settings + self._processor = processor def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" @@ -215,6 +215,8 @@ def reset(self) -> None: def shutdown(self) -> None: stopped = self._stop_worker() if not stopped: + with self._lifecycle_lock: + self._pending_reset = True logger.warning( "Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released." ) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 20f7453..c3ba68e 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -212,7 +212,16 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: with self._lifecycle_lock: if self._writer is None and not self.is_running: - return + # If the recorder was previously marked as abandoned because the + # writer thread did not stop in time, but the thread has since + # exited, perform cleanup so the recorder can become fully stopped + # and restartable. + t = self._writer_thread + if self._abandoned and (t is None or not t.is_alive()): + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False self._stop_event.set() diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 1ab0c10..28bb856 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -367,3 +367,8 @@ def test_stop_timeout_marks_abandoned_and_prevents_restart( # Call stop again to clear resources / reset flags (now it can join cleanly) rec.stop() + # After the writer thread has exited, a second stop() should fully reset state + # so that the recorder is no longer marked as abandoned and can be restarted. + assert rec._abandoned is False + rec.start() + rec.stop() From 60da106490be349c49a5b16614be715d01b9e431 Mon Sep 17 00:00:00 2001 From: Cyril Achard Date: Mon, 2 Mar 2026 17:01:28 +0100 Subject: [PATCH 22/22] Refactor VideoRecorder.stop lifecycle handling Detect and early-return when recorder is already stopped or recovered from an abandoned state, and reduce the time holding the lifecycle lock by moving queue/thread/writer shutdown work outside the lock. Ensure sentinel is enqueued, writer thread is joined with timeout (marking recorder as abandoned and recording an encode_error if it remains alive), close the writer cleanly, save timestamps, and finally clear lifecycle fields under the lock. Improves concurrency, cleanup correctness, and prevents restarting a recorder that was marked abandoned. --- dlclivegui/services/video_recorder.py | 57 +++++++++++++++------------ 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index c3ba68e..b31c8a8 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -211,7 +211,8 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: def stop(self) -> None: with self._lifecycle_lock: - if self._writer is None and not self.is_running: + already_stopped = (self._writer is None) and (not self.is_running) + if already_stopped: # If the recorder was previously marked as abandoned because the # writer thread did not stop in time, but the thread has since # exited, perform cleanup so the recorder can become fully stopped @@ -222,39 +223,43 @@ def stop(self) -> None: self._queue = None self._stop_event.clear() self._abandoned = False + return self._stop_event.set() - q = self._queue - if q is not None: - try: - q.put_nowait(_SENTINEL) - except queue.Full: - pass - t = self._writer_thread - if t is not None: - t.join(timeout=STOP_JOIN_TIMEOUT) - if t.is_alive(): - with self._stats_lock: - self._encode_error = RuntimeError( - "Failed to stop VideoRecorder within timeout; thread is still alive." - ) + writer = self._writer + + if q is not None: + try: + q.put_nowait(_SENTINEL) + except queue.Full: + pass + + if t is not None: + t.join(timeout=STOP_JOIN_TIMEOUT) + if t.is_alive(): + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + with self._lifecycle_lock: self._abandoned = True - logger.critical( - "Failed to stop VideoRecorder within timeout; thread is still alive. " - "Marking recorder as abandoned to prevent restart." - ) - return + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return - if self._writer is not None: - try: - self._writer.close() - except Exception: - logger.exception("Failed to close WriteGear cleanly") + if writer is not None: + try: + writer.close() + except Exception: + logger.exception("Failed to close WriteGear cleanly") - self._save_timestamps() + self._save_timestamps() + with self._lifecycle_lock: self._writer = None self._writer_thread = None self._queue = None