Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4e8a0e5
Fix worker stop, timestamp handling, and imports
C-Achard Mar 2, 2026
ec86f28
Add WorkerState enum for worker lifecycle
C-Achard Mar 2, 2026
cad824c
Improve DLCLive processor lifecycle and safety
C-Achard Mar 2, 2026
d5ea95a
Restart worker if dead and clear stop event
C-Achard Mar 2, 2026
a4d21f7
Mark worker FAULTED on queue errors
C-Achard Mar 2, 2026
76d9a30
Force-terminate frozen camera threads on stop
C-Achard Mar 2, 2026
16b0d2a
Count unique cameras when starting multi-camera
C-Achard Mar 2, 2026
ae1754c
Improve VideoRecorder shutdown and writer loop
C-Achard Mar 2, 2026
29f2604
Refactor writer thread error handling and stats
C-Achard Mar 2, 2026
64f820e
Revert DLCLive imports due to unguarded torch
C-Achard Mar 2, 2026
9b6e1c8
Improve camera thread cleanup and recorder finalization
C-Achard Mar 2, 2026
acd15da
Update dlc_processor.py
C-Achard Mar 2, 2026
369908e
Add lifecycle management & robust writer shutdown
C-Achard Mar 2, 2026
91dff83
Improve lifecycle handling and stale-writer cleanup
C-Achard Mar 2, 2026
2b0a359
Cleanup stale camera workers on start/shutdown
C-Achard Mar 2, 2026
d0999a7
Make stop join timeout configurable and add test
C-Achard Mar 2, 2026
5c06b5c
Fix locking, camera check, and writer finalization
C-Achard Mar 2, 2026
b4e57d6
Add background reaper for stalled DLC worker
C-Achard Mar 2, 2026
5ec6266
Cache _queue to local variable in VideoRecorder
C-Achard Mar 2, 2026
6be4047
Prevent configure while processor running
C-Achard Mar 2, 2026
71bb1aa
Cleanup abandoned recorder and set pending reset
C-Achard Mar 2, 2026
60da106
Refactor VideoRecorder.stop lifecycle handling
C-Achard Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dlclivegui/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,7 +1378,7 @@ def _on_multi_frame_ready(self, frame_data: MultiFrameData) -> None:
dlc_cam_id = selected_id
else:
dlc_cam_id = available_ids[0] if available_ids else ""
if dlc_cam_id is not None:
if dlc_cam_id:
self._inference_camera_id = dlc_cam_id
self._set_dlc_combo_to_id(dlc_cam_id)
self.statusBar().showMessage(
Expand Down
2 changes: 1 addition & 1 deletion dlclivegui/gui/recording_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def write_frame(self, cam_id: str, frame: np.ndarray, timestamp: float | None =
if not rec or not rec.is_running:
return
try:
rec.write(frame, timestamp=timestamp or time.time())
rec.write(frame, timestamp=timestamp if timestamp is not None else time.time())
except Exception as exc:
log.warning("Failed to write frame for %s: %s", cam_id, exc)
try:
Expand Down
205 changes: 168 additions & 37 deletions dlclivegui/services/dlc_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
from dlclivegui.temp import Engine # type: ignore # TODO use main package enum when released

logger = logging.getLogger(__name__)

# Enable profiling
ENABLE_PROFILING = True
STOP_WORKER_TIMEOUT = 10.0 # seconds to wait for worker thread to stop before STOPPING state and reaping

try: # pragma: no cover - optional dependency
from dlclive import (
Expand All @@ -34,10 +32,22 @@
DLCLive = None # type: ignore[assignment]


# Enable profiling to get more detailed timing metrics for debugging and optimization.
ENABLE_PROFILING = True


class PoseBackends(Enum):
DLC_LIVE = auto()


class WorkerState(Enum):
STOPPED = auto()
STARTING = auto()
RUNNING = auto()
STOPPING = auto()
FAULTED = auto()


@dataclass
class PoseResult:
pose: np.ndarray | None
Expand Down Expand Up @@ -135,13 +145,20 @@ class DLCLiveProcessor(QObject):

def __init__(self) -> None:
super().__init__()
# DLCLive instance and config
self._settings = DLCProcessorSettings()
self._dlc: Any | None = None
self._processor: Any | None = None
# Worker thread and queue
self._queue: queue.Queue[Any] | None = None
self._worker_thread: threading.Thread | None = None
self._state = WorkerState.STOPPED
self._lifecycle_lock = threading.Lock()
self._stop_event = threading.Event()
self._initialized = False
## Worker cleanup
self._reaping = False
self._pending_reset = False

# Statistics tracking
self._frames_enqueued = 0
Expand All @@ -164,12 +181,22 @@ def get_model_backend(model_path: str) -> Engine:
return Engine.from_model_path(model_path)

def configure(self, settings: DLCProcessorSettings, processor: Any | None = None) -> None:
self._settings = settings
self._processor = processor
with self._lifecycle_lock:
if self._state != WorkerState.STOPPED:
raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.")
self._settings = settings
self._processor = processor

def reset(self) -> None:
"""Stop the worker thread and drop the current DLCLive instance."""
self._stop_worker()
stopped = self._stop_worker()
if not stopped:
with self._lifecycle_lock:
self._pending_reset = True
logger.warning(
"Reset requested but worker thread is still alive; skipping DLCLive reset to avoid potential issues."
)
return
self._dlc = None
self._initialized = False
with self._stats_lock:
Expand All @@ -186,22 +213,48 @@ def reset(self) -> None:
self._processor_overhead_times.clear()

def shutdown(self) -> None:
self._stop_worker()
stopped = self._stop_worker()
if not stopped:
with self._lifecycle_lock:
self._pending_reset = True
logger.warning(
"Shutdown requested but worker thread is still alive; DLCLive instance may not be fully released."
)
return
self._dlc = None
self._initialized = False

def enqueue_frame(self, frame: np.ndarray, timestamp: float) -> None:
# Start worker on first frame
if self._worker_thread is None:
self._start_worker(frame.copy(), timestamp)
return

# As long as worker and queue are ready, ALWAYS enqueue
if self._queue is None:
# Keep lifecycle lock held only for quick state checks and snapshots.
with self._lifecycle_lock:
if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set():
return
t = self._worker_thread
q = self._queue
should_start = t is None or not t.is_alive()

frame_c = frame.copy()
enq_time = time.perf_counter()

if should_start:
# Re-acquire the lifecycle lock to safely (re)start the worker if needed.
with self._lifecycle_lock:
# Re-check state in case it changed while we were copying the frame.
if self._state in (WorkerState.STOPPING, WorkerState.FAULTED) or self._stop_event.is_set():
return
t = self._worker_thread
if t is None or not t.is_alive():
# _start_worker_locked expects the lifecycle lock to be held.
self._start_worker_locked(frame_c, timestamp)
return
# Worker is now running; refresh queue snapshot.
q = self._queue

if q is None:
return

try:
self._queue.put_nowait((frame.copy(), timestamp, time.perf_counter()))
q.put_nowait((frame_c, timestamp, enq_time))
with self._stats_lock:
self._frames_enqueued += 1
except queue.Full:
Expand Down Expand Up @@ -259,12 +312,13 @@ def get_stats(self) -> ProcessorStats:
avg_processor_overhead=avg_proc_overhead,
)

def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
def _start_worker_locked(self, init_frame: np.ndarray, init_timestamp: float) -> None:
# lifecycle_lock must already be held
if self._worker_thread is not None and self._worker_thread.is_alive():
return

self._queue = queue.Queue(maxsize=1)
self._stop_event.clear()
self._state = WorkerState.STARTING
self._worker_thread = threading.Thread(
target=self._worker_loop,
args=(init_frame, init_timestamp),
Expand All @@ -273,19 +327,63 @@ def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
)
self._worker_thread.start()

def _stop_worker(self) -> None:
if self._worker_thread is None:
return
def _start_worker(self, init_frame: np.ndarray, init_timestamp: float) -> None:
with self._lifecycle_lock:
self._start_worker_locked(init_frame, init_timestamp)

def _stop_worker(self) -> bool:
with self._lifecycle_lock:
t = self._worker_thread
if t is None:
self._state = WorkerState.STOPPED
self._stop_event.clear()
return True
self._state = WorkerState.STOPPING
self._stop_event.set()

t.join(timeout=STOP_WORKER_TIMEOUT)
if t.is_alive():
qsize = self._queue.qsize() if self._queue is not None else -1
logger.warning("DLC worker thread did not terminate cleanly (qsize=%s)", qsize)
self._schedule_reap(t)
return False

# Normal cleanup
with self._lifecycle_lock:
self._worker_thread = None
self._queue = None
self._state = WorkerState.STOPPED
self._stop_event.clear()
return True

self._stop_event.set()
def _schedule_reap(self, t: threading.Thread) -> None:
with self._lifecycle_lock:
if self._reaping:
return
self._reaping = True

# Just wait for the timed get() loop to observe the flag and drain
self._worker_thread.join(timeout=2.0)
if self._worker_thread.is_alive():
logger.warning("DLC worker thread did not terminate cleanly")
# ensure only one reaper
def reap():
try:
t.join() # wait without timeout in background
with self._lifecycle_lock:
# only clean if we're still stopping this thread
if self._worker_thread is t:
self._worker_thread = None
self._queue = None
self._state = WorkerState.STOPPED
self._stop_event.clear()

if self._pending_reset:
self._dlc = None
self._initialized = False
self._pending_reset = False
finally:
with self._lifecycle_lock:
self._reaping = False
logger.debug("[Stop worker] DLC worker thread reaped; processor is STOPPED again")

self._worker_thread = None
self._queue = None
threading.Thread(target=reap, name="DLCLiveReaper", daemon=True).start()

@contextmanager
def _timed_processor(self):
Expand Down Expand Up @@ -328,6 +426,8 @@ def _process_frame(
Single source of truth for: inference -> (optional) processor timing -> signal emit -> stats.
Updates: frames_processed, latency, processing timeline, profiling metrics.
"""
if self._dlc is None:
raise RuntimeError("DLCLive instance is not initialized.")
# Time GPU inference (and processor overhead when present)
with self._timed_processor() as proc_holder:
inference_start = time.perf_counter()
Expand Down Expand Up @@ -377,8 +477,6 @@ def _process_frame(
def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
try:
# -------- Initialization (unchanged) --------
if DLCLive is None:
raise RuntimeError("The 'dlclive' package is required for pose estimation.")
if not self._settings.model_path:
raise RuntimeError("No DLCLive model path configured.")

Expand All @@ -403,7 +501,18 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
if self._settings.device is not None:
options["device"] = self._settings.device

self._dlc = DLCLive(**options)
try:
if DLCLive is None:
raise RuntimeError(
"DLCLive class is not available. Ensure the dlclive package is installed and can be imported."
)
self._dlc = DLCLive(**options)
except Exception as exc:
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
raise RuntimeError(
f"Failed to initialize DLCLive with model '{self._settings.model_path}': {exc}"
) from exc

# First inference to initialize
init_inference_start = time.perf_counter()
Expand All @@ -416,6 +525,8 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:

self._initialized = True
self.initialized.emit(True)
with self._lifecycle_lock:
self._state = WorkerState.RUNNING

total_init_time = time.perf_counter() - init_start
logger.info(
Expand All @@ -431,18 +542,30 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:

except Exception as exc:
logger.exception("Failed to initialize DLCLive", exc_info=exc)
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
self.error.emit(str(exc))
self.initialized.emit(False)
return

q = (
self._queue
) # Assign to local to avoid issues if self._queue is set to None during shutdown while loop is still running.
if q is None:
logger.warning("Worker started without a queue; exiting")
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
self.error.emit("Worker started without a queue")
return

# -------- Main processing loop: stop-flag + timed get + drain --------
# NOTE: We never exit early unless _stop_event is set.
while True:
# If stop requested, only exit when queue is empty
if self._stop_event.is_set():
if self._queue is not None:
if q is not None:
try:
frame, ts, enq = self._queue.get_nowait()
frame, ts, enq = q.get_nowait()
except queue.Empty:
# NOW it is safe to exit
break
Expand All @@ -455,18 +578,24 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
self.error.emit(str(exc))
finally:
try:
self._queue.task_done()
q.task_done()
except ValueError:
pass
continue # check stop_event again WITHOUT breaking

# Normal operation: timed get
try:
wait_start = time.perf_counter()
item = self._queue.get(timeout=0.05)
item = q.get(timeout=0.05)
queue_wait_time = time.perf_counter() - wait_start
except queue.Empty:
continue
except Exception as exc:
logger.exception("Error getting item from queue", exc_info=exc)
with self._lifecycle_lock:
self._state = WorkerState.FAULTED
self.error.emit(str(exc))
break

try:
frame, ts, enq = item
Expand All @@ -476,7 +605,7 @@ def _worker_loop(self, init_frame: np.ndarray, init_timestamp: float) -> None:
self.error.emit(str(exc))
finally:
try:
self._queue.task_done()
q.task_done()
except ValueError:
pass

Expand Down Expand Up @@ -513,6 +642,10 @@ def enqueue(self, frame, ts):
self._proc.enqueue_frame(frame, ts)

def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, selected_key) -> bool:
with self._proc._lifecycle_lock:
if self._proc._state != WorkerState.STOPPED:
raise RuntimeError("Cannot configure DLCLiveProcessor while it is running. Please stop it first.")

processor = None
if selected_key is not None and scanned_processors:
try:
Expand All @@ -526,11 +659,9 @@ def configure(self, settings: DLCProcessorSettings, scanned_processors: dict, se
def start(self):
self._proc.reset()
self.active = True
self.initialized = False

def stop(self):
self.active = False
self.initialized = False
self._proc.reset()
self._last_pose = None

Expand Down
Loading