Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion temporalio/worker/workflow_sandbox/_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,58 @@

logger = logging.getLogger(__name__)


class _SuspendMonitoring:
"""Suspend sys.monitoring events while the sandbox importer is active.

sys.monitoring callbacks are global (fire on all threads). If a callback
(e.g. coverage's branch tracer) triggers a lazy import while the sandbox
importer is active on this thread, the sandbox intercepts it and the
workflow fails. Suspending monitoring prevents this.

Uses reference counting so concurrent sandbox activations (from multiple
worker threads) correctly share the global monitoring state.
"""

def __init__(self) -> None:
self._lock = threading.Lock()
self._count = 0
self._saved: dict[int, int] = {}

def __enter__(self) -> _SuspendMonitoring:
monitoring = getattr(sys, "monitoring", None)
if monitoring is None:
return self
with self._lock:
self._count += 1
if self._count == 1:
for tool_id in range(6):
try:
events = monitoring.get_events(tool_id)
if events:
self._saved[tool_id] = events
monitoring.set_events(tool_id, 0)
except ValueError:
pass
return self

def __exit__(self, *args: object) -> None:
monitoring = getattr(sys, "monitoring", None)
if monitoring is None:
return
with self._lock:
self._count -= 1
if self._count == 0:
for tool_id, events in self._saved.items():
try:
monitoring.set_events(tool_id, events)
except ValueError:
pass
self._saved.clear()


_suspend_monitoring = _SuspendMonitoring()

# Set to true to log lots of sandbox details
LOG_TRACE = False
_trace_depth = 0
Expand Down Expand Up @@ -147,7 +199,8 @@ def applied(self) -> Iterator[None]:
self.import_func, # type: ignore[reportArgumentType]
):
with self._builtins_restricted():
yield None
with _suspend_monitoring:
yield None
finally:
Importer._thread_local_current.importer = orig_importer

Expand Down