diff --git a/dlclivegui/gui/main_window.py b/dlclivegui/gui/main_window.py index df99e83..cf4ac41 100644 --- a/dlclivegui/gui/main_window.py +++ b/dlclivegui/gui/main_window.py @@ -1378,7 +1378,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None: dlc_cam_id = selected_id else: dlc_cam_id = available_ids[0] if available_ids else "" - if dlc_cam_id is not None: + if dlc_cam_id: self._inference_camera_id = dlc_cam_id self._set_dlc_combo_to_id(dlc_cam_id) self.statusBar().showMessage( diff --git a/dlclivegui/gui/recording_manager.py b/dlclivegui/gui/recording_manager.py index 22491c6..49ac993 100644 --- a/dlclivegui/gui/recording_manager.py +++ b/dlclivegui/gui/recording_manager.py @@ -139,7 +139,7 @@ def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None = if not rec or not rec.is_running: return try: - rec.write(frame, timestamp=timestamp or time.time()) + rec.write(frame, timestamp=timestamp if timestamp is not None else time.time()) except Exception as exc: log.warning("Failed to write frame for %s: %s", cam_id, exc) try: diff --git a/dlclivegui/services/dlc_processor.py b/dlclivegui/services/dlc_processor.py index fecad15..46a3ac0 100644 --- a/dlclivegui/services/dlc_processor.py +++ b/dlclivegui/services/dlc_processor.py @@ -21,9 +21,7 @@ from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released logger = logging.getLogger(__name__) - -# Enable profiling -ENABLE_PROFILING = True +STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before STOPPING state and reaping try: # pragma: no cover - optional dependency from dlclive import ( @@ -34,10 +32,22 @@ DLCLive = None # type: ignore[assignment] +# Enable profiling to get more detailed timing metrics for debugging and optimization. +ENABLE_PROFILING = True + + class PoseBackends(Enum): DLC_LIVE = auto() +class WorkerState(Enum): + STOPPED = auto() + STARTING = auto() + RUNNING = auto() + STOPPING = auto() + FAULTED = auto() + + @dataclass class PoseResult: pose: np.ndarray | None @@ -135,13 +145,20 @@ class DLCLiveProcessor(QObject): def __init__(self) -> None: super().__init__() + # DLCLive instance and config self._settings = DLCProcessorSettings() self._dlc: Any | None = None self._processor: Any | None = None + # Worker thread and queue self._queue: queue.Queue[Any] | None = None self._worker_thread: threading.Thread | None = None + self._state = WorkerState.STOPPED + self._lifecycle_lock = threading.Lock() self._stop_event = threading.Event() self._initialized = False + ## Worker cleanup + self._reaping = False + self._pending_reset = False # Statistics tracking self._frames_enqueued = 0 @@ -164,12 +181,22 @@ def get_model_backend(model_path: str) -> Engine: return Engine.from_model_path(model_path) def configure(self, settings: DLCProcessorSettings, processor: Any | None = None) -> None: - self._settings = settings - self._processor = processor + with self._lifecycle_lock: + if self._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") + self._settings = settings + self._processor = processor def reset(self) -> None: """Stop the worker thread and drop the current DLCLive instance.""" - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + with self._lifecycle_lock: + self._pending_reset = True + logger.warning( + "Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues." + ) + return self._dlc = None self._initialized = False with self._stats_lock: @@ -186,22 +213,48 @@ def reset(self) -> None: self._processor_overhead_times.clear() def shutdown(self) -> None: - self._stop_worker() + stopped = self._stop_worker() + if not stopped: + with self._lifecycle_lock: + self._pending_reset = True + logger.warning( + "Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released." + ) + return self._dlc = None self._initialized = False def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None: - # Start worker on first frame - if self._worker_thread is None: - self._start_worker(frame.copy(), timestamp) - return - - # As long as worker and queue are ready, ALWAYS enqueue - if self._queue is None: + # Keep lifecycle lock held only for quick state checks and snapshots. + with self._lifecycle_lock: + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + t = self._worker_thread + q = self._queue + should_start = t is None or not t.is_alive() + + frame_c = frame.copy() + enq_time = time.perf_counter() + + if should_start: + # Re-acquire the lifecycle lock to safely (re)start the worker if needed. + with self._lifecycle_lock: + # Re-check state in case it changed while we were copying the frame. + if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set(): + return + t = self._worker_thread + if t is None or not t.is_alive(): + # _start_worker_locked expects the lifecycle lock to be held. + self._start_worker_locked(frame_c, timestamp) + return + # Worker is now running; refresh queue snapshot. + q = self._queue + + if q is None: return try: - self._queue.put_nowait((frame.copy(), timestamp, time.perf_counter())) + q.put_nowait((frame_c, timestamp, enq_time)) with self._stats_lock: self._frames_enqueued += 1 except queue.Full: @@ -259,12 +312,13 @@ def get_stats(self) -> ProcessorStats: avg_processor_overhead=avg_proc_overhead, ) - def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None: + # lifecycle_lock must already be held if self._worker_thread is not None and self._worker_thread.is_alive(): return - self._queue = queue.Queue(maxsize=1) self._stop_event.clear() + self._state = WorkerState.STARTING self._worker_thread = threading.Thread( target=self._worker_loop, args=(init_frame, init_timestamp), @@ -273,19 +327,63 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: ) self._worker_thread.start() - def _stop_worker(self) -> None: - if self._worker_thread is None: - return + def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None: + with self._lifecycle_lock: + self._start_worker_locked(init_frame, init_timestamp) + + def _stop_worker(self) -> bool: + with self._lifecycle_lock: + t = self._worker_thread + if t is None: + self._state = WorkerState.STOPPED + self._stop_event.clear() + return True + self._state = WorkerState.STOPPING + self._stop_event.set() + + t.join(timeout=STOP_WORKER_TIMEOUT) + if t.is_alive(): + qsize = self._queue.qsize() if self._queue is not None else -1 + logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize) + self._schedule_reap(t) + return False + + # Normal cleanup + with self._lifecycle_lock: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED + self._stop_event.clear() + return True - self._stop_event.set() + def _schedule_reap(self, t: threading.Thread) -> None: + with self._lifecycle_lock: + if self._reaping: + return + self._reaping = True - # Just wait for the timed get() loop to observe the flag and drain - self._worker_thread.join(timeout=2.0) - if self._worker_thread.is_alive(): - logger.warning("DLC worker thread did not terminate cleanly") + # ensure only one reaper + def reap(): + try: + t.join() # wait without timeout in background + with self._lifecycle_lock: + # only clean if we're still stopping this thread + if self._worker_thread is t: + self._worker_thread = None + self._queue = None + self._state = WorkerState.STOPPED + self._stop_event.clear() + + if self._pending_reset: + self._dlc = None + self._initialized = False + self._pending_reset = False + finally: + with self._lifecycle_lock: + self._reaping = False + logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again") - self._worker_thread = None - self._queue = None + threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start() @contextmanager def _timed_processor(self): @@ -328,6 +426,8 @@ def _process_frame( Single source of truth for: inference -> (optional) processor timing -> signal emit -> stats. Updates: frames_processed, latency, processing timeline, profiling metrics. """ + if self._dlc is None: + raise RuntimeError("DLCLive instance is not initialized.") # Time GPU inference (and processor overhead when present) with self._timed_processor() as proc_holder: inference_start = time.perf_counter() @@ -377,8 +477,6 @@ def _process_frame( def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: try: # -------- Initialization (unchanged) -------- - if DLCLive is None: - raise RuntimeError("The 'dlclive' package is required for pose estimation.") if not self._settings.model_path: raise RuntimeError("No DLCLive model path configured.") @@ -403,7 +501,18 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: if self._settings.device is not None: options["device"] = self._settings.device - self._dlc = DLCLive(**options) + try: + if DLCLive is None: + raise RuntimeError( + "DLCLive class is not available. Ensure the dlclive package is installed and can be imported." + ) + self._dlc = DLCLive(**options) + except Exception as exc: + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + raise RuntimeError( + f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}" + ) from exc # First inference to initialize init_inference_start = time.perf_counter() @@ -416,6 +525,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self._initialized = True self.initialized.emit(True) + with self._lifecycle_lock: + self._state = WorkerState.RUNNING total_init_time = time.perf_counter() - init_start logger.info( @@ -431,18 +542,30 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: except Exception as exc: logger.exception("Failed to initialize DLCLive", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED self.error.emit(str(exc)) self.initialized.emit(False) return + q = ( + self._queue + ) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running. + if q is None: + logger.warning("Worker started without a queue; exiting") + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit("Worker started without a queue") + return + # -------- Main processing loop: stop-flag + timed get + drain -------- # NOTE: We never exit early unless _stop_event is set. while True: # If stop requested, only exit when queue is empty if self._stop_event.is_set(): - if self._queue is not None: + if q is not None: try: - frame, ts, enq = self._queue.get_nowait() + frame, ts, enq = q.get_nowait() except queue.Empty: # NOW it is safe to exit break @@ -455,7 +578,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass continue # check stop_event again WITHOUT breaking @@ -463,10 +586,16 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: # Normal operation: timed get try: wait_start = time.perf_counter() - item = self._queue.get(timeout=0.05) + item = q.get(timeout=0.05) queue_wait_time = time.perf_counter() - wait_start except queue.Empty: continue + except Exception as exc: + logger.exception("Error getting item from queue", exc_info=exc) + with self._lifecycle_lock: + self._state = WorkerState.FAULTED + self.error.emit(str(exc)) + break try: frame, ts, enq = item @@ -476,7 +605,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None: self.error.emit(str(exc)) finally: try: - self._queue.task_done() + q.task_done() except ValueError: pass @@ -513,6 +642,10 @@ def enqueue(self, frame, ts): self._proc.enqueue_frame(frame, ts) def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, selected_key) -> bool: + with self._proc._lifecycle_lock: + if self._proc._state != WorkerState.STOPPED: + raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.") + processor = None if selected_key is not None and scanned_processors: try: @@ -526,11 +659,9 @@ def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, se def start(self): self._proc.reset() self.active = True - self.initialized = False def stop(self): self.active = False - self.initialized = False self._proc.reset() self._last_pose = None diff --git a/dlclivegui/services/multi_camera_controller.py b/dlclivegui/services/multi_camera_controller.py index 8db469f..e72d618 100644 --- a/dlclivegui/services/multi_camera_controller.py +++ b/dlclivegui/services/multi_camera_controller.py @@ -5,6 +5,7 @@ import logging import time from dataclasses import dataclass +from functools import partial from threading import Event, Lock import cv2 @@ -20,6 +21,9 @@ LOGGER = logging.getLogger(__name__) +QUIT_WAIT_MS = 5000 # wait for cooperative quit (5s) +TERMINATE_WAIT_MS = 1000 # wait after terminate (1s) + @dataclass class MultiFrameData: @@ -143,7 +147,7 @@ def get_active_count(self) -> int: return len(self._started_cameras) def start(self, camera_settings: list[CameraSettings]) -> None: - """Start multiple cameras; accepts dataclasses, pydantic models, or dicts.""" + """Start multiple cameras.""" if self._running: LOGGER.warning("Multi-camera controller already running") return @@ -158,7 +162,9 @@ def start(self, camera_settings: list[CameraSettings]) -> None: self._timestamps.clear() self._started_cameras.clear() self._failed_cameras.clear() - self._expected_cameras = len(active_settings) + + unique_ids = {get_camera_id(s) for s in active_settings} + self._expected_cameras = len(unique_ids) for settings in active_settings: self._start_camera(settings) @@ -166,8 +172,12 @@ def start(self, camera_settings: list[CameraSettings]) -> None: def _start_camera(self, settings: CameraSettings) -> None: """Start a single camera.""" cam_id = get_camera_id(settings) + existing_thread = self._threads.get(cam_id) + if cam_id in self._workers and (existing_thread is None or not existing_thread.isRunning()): + LOGGER.warning(f"Stale camera worker/thread found for {cam_id}, cleaning up") + self._cleanup_camera(cam_id) if cam_id in self._workers: - LOGGER.warning(f"Camera {cam_id} already has a worker") + LOGGER.warning(f"Camera {cam_id} is already running, skipping start") return # Normalize and store the dataclass once @@ -186,8 +196,25 @@ def _start_camera(self, settings: CameraSettings) -> None: self._workers[cam_id] = worker self._threads[cam_id] = thread + thread.finished.connect(partial(self._cleanup_camera, cam_id)) + worker.stopped.connect(thread.quit) thread.start() + def _cleanup_camera(self, camera_id: str) -> None: + # remove stored frame data + with self._frame_lock: + self._frames.pop(camera_id, None) + self._timestamps.pop(camera_id, None) + + worker = self._workers.pop(camera_id, None) + thread = self._threads.pop(camera_id, None) + self._settings.pop(camera_id, None) + + if worker is not None: + worker.deleteLater() + if thread is not None: + thread.deleteLater() + def stop(self, wait: bool = True) -> None: """Stop all cameras.""" if not self._running: @@ -201,17 +228,51 @@ def stop(self, wait: bool = True) -> None: # Wait for threads to finish if wait: - for thread in self._threads.values(): - if thread.isRunning(): - thread.quit() - thread.wait(5000) - - self._workers.clear() - self._threads.clear() - self._settings.clear() + still_running: list[str] = [] + for cam_id, thread in list(self._threads.items()): + if thread is None: + self._cleanup_camera(cam_id) + continue + if not thread.isRunning(): + self._cleanup_camera(cam_id) + continue + + thread.quit() + if thread.wait(QUIT_WAIT_MS): + self._cleanup_camera(cam_id) + continue # Clean exit + + LOGGER.error( + "Camera thread %s did not quit within %dms; forcing terminate()", + cam_id, + QUIT_WAIT_MS, + ) + + thread.terminate() + if thread.wait(TERMINATE_WAIT_MS): + self._cleanup_camera(cam_id) + continue # Terminated successfully + + LOGGER.critical( + "Camera thread %s refused to terminate after terminate()+wait(%dms). " + "Keeping references to avoid use-after-free/segfaults. " + "Application restart may be required.", + cam_id, + TERMINATE_WAIT_MS, + ) + still_running.append(cam_id) + + if still_running: + self._started_cameras.clear() + return + self._started_cameras.clear() self._failed_cameras.clear() + with self._frame_lock: + self._frames.clear() + self._timestamps.clear() self._expected_cameras = 0 + self.all_stopped.emit() def _on_frame_captured(self, camera_id: str, frame: np.ndarray, timestamp: float) -> None: @@ -430,14 +491,9 @@ def _on_camera_stopped(self, camera_id: str) -> None: # Cleanup thread if camera_id in self._threads: - thread = self._threads[camera_id] - if thread.isRunning(): + thread = self._threads.get(camera_id) + if thread is not None and thread.isRunning(): thread.quit() - thread.wait(1000) - del self._threads[camera_id] - - if camera_id in self._workers: - del self._workers[camera_id] # Remove frame data with self._frame_lock: @@ -456,7 +512,11 @@ def _on_camera_stopped(self, camera_id: str) -> None: return # Check if all running cameras have stopped (normal shutdown) - if not self._started_cameras and self._running and not self._workers: + if ( + not self._started_cameras + and self._running + and all(not t.isRunning() for t in self._threads.values() if t is not None) + ): self._running = False self.all_stopped.emit() diff --git a/dlclivegui/services/video_recorder.py b/dlclivegui/services/video_recorder.py index 1addbca..b31c8a8 100644 --- a/dlclivegui/services/video_recorder.py +++ b/dlclivegui/services/video_recorder.py @@ -23,6 +23,8 @@ logger = logging.getLogger(__name__) +STOP_JOIN_TIMEOUT = 5.0 # seconds + @dataclass class RecorderStats: @@ -53,6 +55,7 @@ def __init__( crf: int = 23, buffer_size: int = 240, ): + # Config self._output = Path(output) self._writer: Any | None = None self._frame_size = frame_size @@ -60,10 +63,14 @@ def __init__( self._codec = codec self._crf = int(crf) self._buffer_size = max(1, int(buffer_size)) + # Worker state self._queue: queue.Queue[Any] | None = None self._writer_thread: threading.Thread | None = None self._stop_event = threading.Event() self._stats_lock = threading.Lock() + self._lifecycle_lock = threading.Lock() + self._abandoned = False + # Stats self._frames_enqueued = 0 self._frames_written = 0 self._dropped_frames = 0 @@ -81,49 +88,74 @@ def is_running(self) -> bool: def start(self) -> None: if WriteGear is None: raise RuntimeError("vidgear is required for video recording. Install it with 'pip install vidgear'.") - if self._writer is not None: - return - fps_value = float(self._frame_rate) if self._frame_rate else 30.0 - - writer_kwargs: dict[str, Any] = { - "compression_mode": True, - "logging": False, - "-input_framerate": fps_value, - "-vcodec": (self._codec or "libx264").strip() or "libx264", - "-crf": int(self._crf), - } - # TODO deal with pixel format - - self._output.parent.mkdir(parents=True, exist_ok=True) - self._writer = WriteGear(output=str(self._output), **writer_kwargs) - self._queue = queue.Queue(maxsize=self._buffer_size) - self._frames_enqueued = 0 - self._frames_written = 0 - self._dropped_frames = 0 - self._total_latency = 0.0 - self._last_latency = 0.0 - self._written_times.clear() - self._frame_timestamps.clear() - self._encode_error = None - self._stop_event.clear() - self._writer_thread = threading.Thread( - target=self._writer_loop, - name="VideoRecorderWriter", - daemon=True, - ) - self._writer_thread.start() + + with self._lifecycle_lock: + if self._abandoned: + raise RuntimeError("Cannot restart VideoRecorder, as a leftover thread is still running.") + if self.is_running: + return + if self._writer is not None: + # Best-effort cleanup of a stale writer to avoid leaking resources. + logger.warning( + "VideoRecorder.start() found an existing writer while not running; " + "attempting to close the stale writer before restarting." + ) + try: + close_method = getattr(self._writer, "close", None) + if callable(close_method): + close_method() + except Exception: + logger.exception("Error while closing stale video writer in start().") + finally: + self._writer = None + self._queue = None + self._writer_thread = None + + fps_value = float(self._frame_rate) if self._frame_rate else 30.0 + + writer_kwargs: dict[str, Any] = { + "compression_mode": True, + "logging": False, + "-input_framerate": fps_value, + "-vcodec": (self._codec or "libx264").strip() or "libx264", + "-crf": int(self._crf), + } + # TODO deal with pixel format + + self._output.parent.mkdir(parents=True, exist_ok=True) + self._writer = WriteGear(output=str(self._output), **writer_kwargs) + self._queue = queue.Queue(maxsize=self._buffer_size) + self._frames_enqueued = 0 + self._frames_written = 0 + self._dropped_frames = 0 + self._total_latency = 0.0 + self._last_latency = 0.0 + self._written_times.clear() + self._frame_timestamps.clear() + self._encode_error = None + self._stop_event.clear() + self._writer_thread = threading.Thread( + target=self._writer_loop, + name="VideoRecorderWriter", + daemon=True, + ) + self._writer_thread.start() def configure_stream(self, frame_size: tuple[int, int], frame_rate: float | None) -> None: self._frame_size = frame_size self._frame_rate = frame_rate def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: - if not self.is_running or self._queue is None: - return False error = self._current_error() if error is not None: raise RuntimeError(f"Video encoding failed: {error}") from error + q = self._queue + if not self.is_running or q is None: + return False + if self._stop_event.is_set(): + return False + # Capture timestamp now, but only record it if frame is successfully enqueued if timestamp is None: timestamp = time.time() @@ -162,12 +194,11 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return False try: - assert self._queue is not None - self._queue.put((frame, timestamp), block=False) + q.put((frame, timestamp), block=False) except queue.Full: with self._stats_lock: self._dropped_frames += 1 - queue_size = self._queue.qsize() if self._queue is not None else -1 + queue_size = q.qsize() logger.warning( "Video recorder queue full; dropping frame. queue=%d buffer=%d", queue_size, @@ -179,31 +210,61 @@ def write(self, frame: np.ndarray, timestamp: float | None = None) -> bool: return True def stop(self) -> None: - if self._writer is None and not self.is_running: - return - self._stop_event.set() - if self._queue is not None: + with self._lifecycle_lock: + already_stopped = (self._writer is None) and (not self.is_running) + if already_stopped: + # If the recorder was previously marked as abandoned because the + # writer thread did not stop in time, but the thread has since + # exited, perform cleanup so the recorder can become fully stopped + # and restartable. + t = self._writer_thread + if self._abandoned and (t is None or not t.is_alive()): + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False + return + + self._stop_event.set() + q = self._queue + t = self._writer_thread + writer = self._writer + + if q is not None: try: - self._queue.put_nowait(_SENTINEL) + q.put_nowait(_SENTINEL) except queue.Full: pass - # self._queue.put(_SENTINEL) - if self._writer_thread is not None: - self._writer_thread.join(timeout=5.0) - if self._writer_thread.is_alive(): - logger.warning("Video recorder thread did not terminate cleanly") - if self._writer is not None: + + if t is not None: + t.join(timeout=STOP_JOIN_TIMEOUT) + if t.is_alive(): + with self._stats_lock: + self._encode_error = RuntimeError( + "Failed to stop VideoRecorder within timeout; thread is still alive." + ) + with self._lifecycle_lock: + self._abandoned = True + logger.critical( + "Failed to stop VideoRecorder within timeout; thread is still alive. " + "Marking recorder as abandoned to prevent restart." + ) + return + + if writer is not None: try: - self._writer.close() + writer.close() except Exception: logger.exception("Failed to close WriteGear cleanly") - # Save timestamps to JSON file self._save_timestamps() - self._writer = None - self._writer_thread = None - self._queue = None + with self._lifecycle_lock: + self._writer = None + self._writer_thread = None + self._queue = None + self._stop_event.clear() + self._abandoned = False def get_stats(self) -> RecorderStats | None: if ( @@ -236,43 +297,67 @@ def get_stats(self) -> RecorderStats | None: ) def _writer_loop(self) -> None: - assert self._queue is not None + q = self._queue + if q is None: + with self._stats_lock: + self._encode_error = RuntimeError("Writer loop started without a queue") + logger.error("Writer loop started without a queue; exiting") + return + try: while True: try: - item = self._queue.get(timeout=0.1) + item = q.get(timeout=0.1) except queue.Empty: if self._stop_event.is_set(): break continue - if item is _SENTINEL: - self._queue.task_done() - break - frame, timestamp = item - start = time.perf_counter() - try: - assert self._writer is not None - self._writer.write(frame) - except OSError as exc: + except Exception as exc: with self._stats_lock: self._encode_error = exc - logger.exception("Video encoding failed while writing frame") - self._queue.task_done() + logger.exception("Could not retrieve item from queue", exc_info=exc) self._stop_event.set() break - elapsed = time.perf_counter() - start - now = time.perf_counter() - with self._stats_lock: - self._frames_written += 1 - self._total_latency += elapsed - self._last_latency = elapsed - self._written_times.append(now) - self._frame_timestamps.append(timestamp) - if now - self._last_log_time >= 1.0: - self._compute_write_fps_locked() - self._queue.qsize() - self._last_log_time = now - self._queue.task_done() + + try: + if item is _SENTINEL: + break + else: + frame, timestamp = item + start = time.perf_counter() + + try: + writer = self._writer + if writer is None: + raise RuntimeError("WriteGear writer is not initialized") + writer.write(frame) + except Exception as exc: + with self._stats_lock: + self._encode_error = exc + logger.exception("Video encoding failed while writing frame", exc_info=exc) + self._stop_event.set() + break + else: + elapsed = time.perf_counter() - start + now = time.perf_counter() + with self._stats_lock: + self._frames_written += 1 + self._total_latency += elapsed + self._last_latency = elapsed + self._written_times.append(now) + self._frame_timestamps.append(timestamp) + if now - self._last_log_time >= 1.0: + self._compute_write_fps_locked() + self._last_log_time = now + + finally: + # Ensure queue accounting is correct for every item pulled from q + try: + q.task_done() + except ValueError: + logger.warning("Queue task_done() called too many times in writer loop") + pass + finally: self._finalize_writer() diff --git a/tests/services/test_video_recorder.py b/tests/services/test_video_recorder.py index 7665ae3..28bb856 100644 --- a/tests/services/test_video_recorder.py +++ b/tests/services/test_video_recorder.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import threading import time from pathlib import Path @@ -75,6 +76,38 @@ def gray_frame(): return np.zeros((48, 64), dtype=np.uint8) +class BlockingWriteGear(FakeWriteGear): + """ + Fake WriteGear that blocks inside write() to simulate a hung encoder/IO stall. + """ + + instances = [] + + def __init__(self, output: str, **kwargs): + super().__init__(output, **kwargs) + self.entered_write = threading.Event() + self.release_write = threading.Event() + BlockingWriteGear.instances.append(self) + + def write(self, frame): + # Signal that the writer thread is now stuck inside write() + self.entered_write.set() + # Block until the test releases us (long timeout as safety, but test releases explicitly) + self.release_write.wait(timeout=60.0) + # If released, behave like normal FakeWriteGear + return super().write(frame) + + +@pytest.fixture +def patch_blocking_writegear(monkeypatch): + """ + Patch module-level WriteGear to BlockingWriteGear for the hung-thread test. + """ + BlockingWriteGear.instances.clear() + monkeypatch.setattr(vr_mod, "WriteGear", BlockingWriteGear) + return BlockingWriteGear + + # ---------------------------- # Tests # ---------------------------- @@ -295,3 +328,47 @@ def test_overlay_frame_size_mismatch_still_detected(patch_writegear, output_path rec.write(np.zeros((48, 64, 3), dtype=np.uint8), timestamp=2.0) rec.stop() + + +def test_stop_timeout_marks_abandoned_and_prevents_restart( + patch_blocking_writegear, monkeypatch, output_path, rgb_frame +): + # Make stop timeout tiny so the test is fast. + monkeypatch.setattr(vr_mod, "STOP_JOIN_TIMEOUT", 0.01) + + rec = vr_mod.VideoRecorder(output_path, frame_size=(48, 64), buffer_size=10) + rec.start() + assert rec.is_running is True + + # Enqueue one frame so writer thread enters BlockingWriteGear.write() and blocks. + assert rec.write(rgb_frame, timestamp=1.0) is True + + wg = patch_blocking_writegear.instances[0] + wait_until(lambda: wg.entered_write.is_set(), timeout=1.0) + + # Now stop: should time out and mark abandoned + rec.stop() + + assert rec._abandoned is True + err = rec._current_error() + assert err is not None + assert "Failed to stop VideoRecorder within timeout" in str(err) + + # Thread should still be alive (blocked in write) + assert rec.is_running is True + + # Restart should be prevented while abandoned + with pytest.raises(RuntimeError, match="Cannot restart VideoRecorder"): + rec.start() + + # ---- Cleanup to avoid leaking a blocked daemon thread into other tests ---- + wg.release_write.set() # let write() return + wait_until(lambda: not rec.is_running, timeout=2.0) + + # Call stop again to clear resources / reset flags (now it can join cleanly) + rec.stop() + # After the writer thread has exited, a second stop() should fully reset state + # so that the recorder is no longer marked as abandoned and can be restarted. + assert rec._abandoned is False + rec.start() + rec.stop()