diff --git a/temporalio/contrib/opentelemetry/README.md b/temporalio/contrib/opentelemetry/README.md new file mode 100644 index 000000000..9f1e1303b --- /dev/null +++ b/temporalio/contrib/opentelemetry/README.md @@ -0,0 +1,259 @@ +# OpenTelemetry Integration for Temporal Python SDK + +This package provides OpenTelemetry tracing integration for Temporal workflows, activities, and other operations. It includes automatic span creation and propagation for distributed tracing across your Temporal applications. + +## Overview + +There are **two different approaches** for integrating OpenTelemetry with the Temporal Python SDK: + +1. **🆕 New Approach (Recommended)**: `OpenTelemetryPlugin` - Provides accurate duration spans and direct OpenTelemetry usage within workflows +2. **📊 Legacy Approach**: `TracingInterceptor` - Provides immediate span visibility but with zero-duration workflow spans + +## Quick Start + +### New Approach (OpenTelemetryPlugin) + +```python +import opentelemetry.trace +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider + +# Create a replay-safe tracer provider +provider = create_tracer_provider() +provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) +opentelemetry.trace.set_tracer_provider(provider) + +# Register plugin on CLIENT (automatically applies to workers using this client) +client = await Client.connect( + "localhost:7233", + plugins=[OpenTelemetryPlugin()] +) + +# Workers created with this client automatically get the plugin +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity] + # NO NEED to specify plugins here - they come from the client +) +``` + +### Legacy Approach (TracingInterceptor) + +```python +from temporalio.contrib.opentelemetry import TracingInterceptor + +# Register interceptor on CLIENT (automatically applies to workers using this client) +client = await Client.connect( + "localhost:7233", + interceptors=[TracingInterceptor()] +) + +# Workers created with this client automatically get the interceptor +worker = Worker( + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity] + # NO NEED to specify interceptors here - they come from the client +) +``` + +## Detailed Comparison + +### New Approach: OpenTelemetryPlugin + +#### ✅ Advantages: +- **Accurate Duration Spans**: Workflow spans have real durations reflecting actual execution time +- **Direct OpenTelemetry Usage**: Use `opentelemetry.trace.get_tracer()` directly within workflows +- **Better Span Hierarchy**: More accurate parent-child relationships within workflows +- **Workflow Context Access**: Access spans within workflows using `temporalio.contrib.opentelemetry.workflow.tracer()` + +#### ⚠️ Considerations: +- **Experimental Status**: Subject to breaking changes in future versions +- **Delayed Span Visibility**: Workflow spans only appear after workflow completion +- **Different Trace Structure**: Migration from legacy approach may break dependencies on specific trace structures + +#### Usage Example: +```python +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self): + # Direct OpenTelemetry usage works correctly + tracer = get_tracer(__name__) + with tracer.start_as_current_span("workflow-operation"): + # This span will have accurate duration + await workflow.execute_activity( + my_activity, + start_to_close_timeout=timedelta(seconds=30) + ) +``` + +### Legacy Approach: TracingInterceptor + +**File**: `temporalio/contrib/opentelemetry/_interceptor.py` + +#### ✅ Advantages: +- **Immediate Span Visibility**: Spans appear as soon as they're created +- **Stable API**: Well-established interface, not subject to experimental changes +- **Workflow Progress Tracking**: Can see workflow spans even before workflow completes + +#### ⚠️ Limitations: +- **Zero-Duration Workflow Spans**: All workflow spans are immediately ended with 0ms duration +- **No Direct OpenTelemetry Usage**: Cannot use standard OpenTelemetry APIs within workflows +- **Limited Workflow Span Creation**: Must use `temporalio.contrib.opentelemetry.workflow.completed_span()` + +#### Usage Example: +```python +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self): + # Must use Temporal-specific span creation + temporalio.contrib.opentelemetry.workflow.completed_span( + "workflow-operation", + attributes={"custom": "attribute"} + ) + # Standard OpenTelemetry APIs don't work properly here +``` + +## When to Use Each Approach + +### Choose OpenTelemetryPlugin When: +- You need accurate span durations for performance analysis +- You want to use standard OpenTelemetry APIs within workflows +- You're building new applications +- You can tolerate experimental API changes + +### Choose TracingInterceptor When: +- You need immediate visibility into workflow progress +- You have existing dependencies on the current trace structure +- You require a stable, non-experimental API +- You primarily need basic tracing without complex workflow span hierarchies + +## Configuration Options + +### OpenTelemetryPlugin Options + +```python +plugin = OpenTelemetryPlugin( + add_temporal_spans=False # Whether to add additional Temporal-specific spans +) +``` + +### TracingInterceptor Options + +```python +interceptor = TracingInterceptor( + tracer=None, # Custom tracer (defaults to global tracer) + always_create_workflow_spans=False # Create spans even without parent context +) +``` + +## Migration Guide + +### From TracingInterceptor to OpenTelemetryPlugin + +1. **Replace interceptor with plugin on client**: + ```python + # Old + client = await Client.connect( + "localhost:7233", + interceptors=[TracingInterceptor()] + ) + + # New + provider = create_tracer_provider() + opentelemetry.trace.set_tracer_provider(provider) + client = await Client.connect( + "localhost:7233", + plugins=[OpenTelemetryPlugin()] + ) + ``` + +2. **Update workflow span creation**: + ```python + # Old + temporalio.contrib.opentelemetry.workflow.completed_span("my-span") + + # New - use standard OpenTelemetry + tracer = get_tracer(__name__) + with tracer.start_as_current_span("my-span"): + # Your workflow logic + pass + ``` + +3. **Test trace structure changes**: Verify that any monitoring or analysis tools still work with the new trace structure. + +## Advanced Usage + +### Creating Custom Spans in Workflows (New Approach) + +```python +from opentelemetry.trace import get_tracer + +@workflow.defn +class MyWorkflow: + @workflow.run + async def run(self): + tracer = get_tracer(__name__) + + # Create spans with accurate durations + with tracer.start_as_current_span("business-logic") as span: + span.set_attribute("workflow.step", "processing") + + # Nested spans work correctly + with tracer.start_as_current_span("data-validation"): + await self.validate_input() + + await workflow.execute_activity( + process_data, + start_to_close_timeout=timedelta(seconds=60) + ) +``` + +### Custom Span Attributes + +Both approaches support adding custom attributes to spans: + +```python +# Legacy approach +temporalio.contrib.opentelemetry.workflow.completed_span( + "my-operation", + attributes={ + "business.unit": "payments", + "request.id": "req-123" + } +) + +# New approach +with tracer.start_as_current_span("my-operation") as span: + span.set_attributes({ + "business.unit": "payments", + "request.id": "req-123" + }) +``` + +## Best Practices + +1. **Register on Client**: Always register plugins/interceptors on the client, not the worker, to ensure proper context propagation + +2. **Use create_tracer_provider()**: Always use the provided function to create replay-safe tracer providers when using the new approach + +3. **Set Global Tracer Provider**: Ensure the tracer provider is set globally before creating clients + +4. **Avoid Duplication**: Never register the same plugin/interceptor on both client and worker + +## Troubleshooting + +### Common Issues + +1. **"ReplaySafeTracerProvider required" error**: Make sure you're using `create_tracer_provider()` when using OpenTelemetryPlugin + +2. **Missing spans**: Verify that the tracer provider is set before creating clients, and that plugins/interceptors are registered on the client + +3. **Duplicate spans**: Check that you haven't registered the same plugin/interceptor on both client and worker + +4. **Zero-duration spans**: This is expected behavior with TracingInterceptor for workflow spans diff --git a/temporalio/contrib/opentelemetry/__init__.py b/temporalio/contrib/opentelemetry/__init__.py new file mode 100644 index 000000000..74f069322 --- /dev/null +++ b/temporalio/contrib/opentelemetry/__init__.py @@ -0,0 +1,22 @@ +"""OpenTelemetry v2 integration for Temporal SDK. + +This package provides OpenTelemetry tracing integration for Temporal workflows, +activities, and other operations. It includes automatic span creation and +propagation for distributed tracing. +""" + +from temporalio.contrib.opentelemetry._interceptor import ( + TracingInterceptor, + TracingWorkflowInboundInterceptor, +) +from temporalio.contrib.opentelemetry._otel_interceptor import OpenTelemetryInterceptor +from temporalio.contrib.opentelemetry._plugin import OpenTelemetryPlugin +from temporalio.contrib.opentelemetry._tracer_provider import create_tracer_provider + +__all__ = [ + "TracingInterceptor", + "TracingWorkflowInboundInterceptor", + "OpenTelemetryInterceptor", + "OpenTelemetryPlugin", + "create_tracer_provider", +] diff --git a/temporalio/contrib/opentelemetry/_id_generator.py b/temporalio/contrib/opentelemetry/_id_generator.py new file mode 100644 index 000000000..9aaa00ef1 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_id_generator.py @@ -0,0 +1,72 @@ +import random + +from opentelemetry.sdk.trace.id_generator import IdGenerator +from opentelemetry.trace import ( + INVALID_SPAN_ID, + INVALID_TRACE_ID, +) + +import temporalio.workflow + + +def _get_workflow_random() -> random.Random | None: + if ( + temporalio.workflow.in_workflow() + and not temporalio.workflow.unsafe.is_read_only() + ): + if ( + getattr(temporalio.workflow.instance(), "__temporal_otel_id_random", None) + is None + ): + setattr( + temporalio.workflow.instance(), + "__temporal_otel_id_random", + temporalio.workflow.new_random(), + ) + return getattr(temporalio.workflow.instance(), "__temporal_otel_id_random") + + return None + + +class TemporalIdGenerator(IdGenerator): + """OpenTelemetry ID generator that uses Temporal's deterministic random generator. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This generator uses Temporal's workflow-safe random number generator when + inside a workflow execution, ensuring deterministic span and trace IDs + across workflow replays. Falls back to standard random generation outside + of workflows. + """ + + def __init__(self, id_generator: IdGenerator): + """Initialize a TemporalIdGenerator.""" + self._id_generator = id_generator + + def generate_span_id(self) -> int: + """Generate a span ID using Temporal's deterministic random when in workflow. + + Returns: + A 64-bit span ID. + """ + if workflow_random := _get_workflow_random(): + span_id = workflow_random.getrandbits(64) + while span_id == INVALID_SPAN_ID: + span_id = workflow_random.getrandbits(64) + return span_id + return self._id_generator.generate_span_id() + + def generate_trace_id(self) -> int: + """Generate a trace ID using Temporal's deterministic random when in workflow. + + Returns: + A 128-bit trace ID. + """ + if workflow_random := _get_workflow_random(): + trace_id = workflow_random.getrandbits(128) + while trace_id == INVALID_TRACE_ID: + trace_id = workflow_random.getrandbits(128) + return trace_id + return self._id_generator.generate_trace_id() diff --git a/temporalio/contrib/opentelemetry.py b/temporalio/contrib/opentelemetry/_interceptor.py similarity index 96% rename from temporalio/contrib/opentelemetry.py rename to temporalio/contrib/opentelemetry/_interceptor.py index 3e08ea68d..69a2cfb0c 100644 --- a/temporalio/contrib/opentelemetry.py +++ b/temporalio/contrib/opentelemetry/_interceptor.py @@ -827,43 +827,3 @@ def _carrier_to_nexus_headers( else: out[k] = v return out - - -class workflow: - """Contains static methods that are safe to call from within a workflow. - - .. warning:: - Using any other ``opentelemetry`` API could cause non-determinism. - """ - - def __init__(self) -> None: # noqa: D107 - raise NotImplementedError - - @staticmethod - def completed_span( - name: str, - *, - attributes: opentelemetry.util.types.Attributes = None, - exception: Exception | None = None, - ) -> None: - """Create and end an OpenTelemetry span. - - Note, this will only create and record when the workflow is not - replaying and if there is a current span (meaning the client started a - span and this interceptor is configured on the worker and the span is on - the context). - - There is currently no way to create a long-running span or to create a - span that actually spans other code. - - Args: - name: Name of the span. - attributes: Attributes to set on the span if any. Workflow ID and - run ID are automatically added. - exception: Optional exception to record on the span. - """ - interceptor = TracingWorkflowInboundInterceptor._from_context() - if interceptor: - interceptor._completed_span( - name, additional_attributes=attributes, exception=exception - ) diff --git a/temporalio/contrib/opentelemetry/_otel_interceptor.py b/temporalio/contrib/opentelemetry/_otel_interceptor.py new file mode 100644 index 000000000..1756f93e1 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_otel_interceptor.py @@ -0,0 +1,586 @@ +"""OpenTelemetry interceptor that creates/propagates spans.""" + +from __future__ import annotations + +from collections.abc import Iterator, Mapping +from contextlib import contextmanager +from typing import ( + Any, + NoReturn, + TypeAlias, +) + +import nexusrpc.handler +import opentelemetry.baggage.propagation +import opentelemetry.context +import opentelemetry.propagators.composite +import opentelemetry.propagators.textmap +import opentelemetry.trace +import opentelemetry.trace.propagation.tracecontext +import opentelemetry.util.types +from opentelemetry.context import Context +from opentelemetry.trace import ( + Status, + StatusCode, + Tracer, + get_tracer, + get_tracer_provider, +) +from typing_extensions import Protocol + +import temporalio.activity +import temporalio.api.common.v1 +import temporalio.client +import temporalio.converter +import temporalio.worker +import temporalio.workflow +from temporalio.contrib.opentelemetry._tracer_provider import ( + ReplaySafeTracerProvider, +) +from temporalio.exceptions import ApplicationError, ApplicationErrorCategory + +# OpenTelemetry dynamically, lazily chooses its context implementation at +# runtime. When first accessed, they use pkg_resources.iter_entry_points + load. +# The load uses built-in open() which we don't allow in sandbox mode at runtime, +# only import time. Therefore if the first use of a OTel context is inside the +# sandbox, which it may be for a workflow worker, this will fail. So instead we +# eagerly reference it here to force loading at import time instead of lazily. +opentelemetry.context.get_current() + +default_text_map_propagator = opentelemetry.propagators.composite.CompositePropagator( + [ + opentelemetry.trace.propagation.tracecontext.TraceContextTextMapPropagator(), + opentelemetry.baggage.propagation.W3CBaggagePropagator(), + ] +) +"""Default text map propagator used by :py:class:`TracingInterceptor`.""" + +_CarrierDict: TypeAlias = dict[str, opentelemetry.propagators.textmap.CarrierValT] + + +def _context_to_headers( + headers: Mapping[str, temporalio.api.common.v1.Payload], +) -> Mapping[str, temporalio.api.common.v1.Payload]: + carrier: _CarrierDict = {} + default_text_map_propagator.inject(carrier) + if carrier: + headers = { + **headers, + "_tracer-data": temporalio.converter.PayloadConverter.default.to_payloads( + [carrier] + )[0], + } + return headers + + +def _context_to_nexus_headers(headers: Mapping[str, str]) -> Mapping[str, str]: + carrier: _CarrierDict = {} + default_text_map_propagator.inject(carrier) + if carrier: + out = {**headers} if headers else {} + for k, v in carrier.items(): + if isinstance(v, list): + out[k] = ",".join(v) + else: + out[k] = v + return out + else: + return headers + + +def _headers_to_context( + headers: Mapping[str, temporalio.api.common.v1.Payload], +) -> Context: + context_header = headers.get("_tracer-data") + if context_header: + context_carrier: _CarrierDict = ( + temporalio.converter.PayloadConverter.default.from_payloads( + [context_header] + )[0] + ) + + context = default_text_map_propagator.extract(context_carrier) + else: + context = opentelemetry.context.Context() + return context + + +def _nexus_headers_to_context(headers: Mapping[str, str]) -> Context: + context = default_text_map_propagator.extract(headers) + return context + + +@contextmanager +def _maybe_span( + tracer: Tracer, + name: str, + *, + add_temporal_spans: bool, + attributes: opentelemetry.util.types.Attributes, + kind: opentelemetry.trace.SpanKind, + context: Context | None = None, +) -> Iterator[None]: + if not add_temporal_spans: + yield + return + + token = opentelemetry.context.attach(context) if context else None + try: + with tracer.start_as_current_span( + name, + attributes=attributes, + kind=kind, + context=context, + set_status_on_exception=False, + ) as span: + try: + yield + except Exception as exc: + if ( + not isinstance(exc, ApplicationError) + or exc.category != ApplicationErrorCategory.BENIGN + ): + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + raise + finally: + if token and context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class OpenTelemetryInterceptor( + temporalio.client.Interceptor, temporalio.worker.Interceptor +): + """Interceptor that supports client and worker OpenTelemetry span creation + and propagation. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This should be created and used for ``interceptors`` on the + :py:meth:`temporalio.client.Client.connect` call to apply to all client + calls and worker calls using that client. To only apply to workers, set as + worker creation option instead of in client. + """ + + def __init__( # type: ignore[reportMissingSuperCall] + self, + add_temporal_spans: bool = False, + ) -> None: + """Initialize a OpenTelemetry tracing interceptor.""" + self._add_temporal_spans = add_temporal_spans + + def intercept_client( + self, next: temporalio.client.OutboundInterceptor + ) -> temporalio.client.OutboundInterceptor: + """Implementation of + :py:meth:`temporalio.client.Interceptor.intercept_client`. + """ + return _TracingClientOutboundInterceptor(next, self._add_temporal_spans) + + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_activity`. + """ + return _TracingActivityInboundInterceptor(next, self._add_temporal_spans) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> type[_TracingWorkflowInboundInterceptor]: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.workflow_interceptor_class`. + """ + provider = get_tracer_provider() + if not isinstance(provider, ReplaySafeTracerProvider): + raise ValueError( + "When using OpenTelemetryPlugin, the global trace provider must be a ReplaySafeTracerProvider. Use init_tracer_provider to create one." + ) + + class InterceptorWithState(_TracingWorkflowInboundInterceptor): + _add_temporal_spans = self._add_temporal_spans + + return InterceptorWithState + + def intercept_nexus_operation( + self, next: temporalio.worker.NexusOperationInboundInterceptor + ) -> temporalio.worker.NexusOperationInboundInterceptor: + """Implementation of + :py:meth:`temporalio.worker.Interceptor.intercept_nexus_operation`. + """ + return _TracingNexusOperationInboundInterceptor(next, self._add_temporal_spans) + + +class _TracingClientOutboundInterceptor(temporalio.client.OutboundInterceptor): + def __init__( + self, + next: temporalio.client.OutboundInterceptor, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._add_temporal_spans = add_temporal_spans + + async def start_workflow( + self, input: temporalio.client.StartWorkflowInput + ) -> temporalio.client.WorkflowHandle[Any, Any]: + prefix = ( + "StartWorkflow" if not input.start_signal else "SignalWithStartWorkflow" + ) + with _maybe_span( + get_tracer(__name__), + f"{prefix}:{input.workflow}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_workflow(input) + + async def query_workflow(self, input: temporalio.client.QueryWorkflowInput) -> Any: + with _maybe_span( + get_tracer(__name__), + f"QueryWorkflow:{input.query}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().query_workflow(input) + + async def signal_workflow( + self, input: temporalio.client.SignalWorkflowInput + ) -> None: + with _maybe_span( + get_tracer(__name__), + f"SignalWorkflow:{input.signal}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().signal_workflow(input) + + async def start_workflow_update( + self, input: temporalio.client.StartWorkflowUpdateInput + ) -> temporalio.client.WorkflowUpdateHandle[Any]: + with _maybe_span( + get_tracer(__name__), + f"StartWorkflowUpdate:{input.update}", + add_temporal_spans=self._add_temporal_spans, + attributes={"temporalWorkflowID": input.id}, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_workflow_update(input) + + async def start_update_with_start_workflow( + self, input: temporalio.client.StartWorkflowUpdateWithStartInput + ) -> temporalio.client.WorkflowUpdateHandle[Any]: + attrs = { + "temporalWorkflowID": input.start_workflow_input.id, + } + if input.update_workflow_input.update_id is not None: + attrs["temporalUpdateID"] = input.update_workflow_input.update_id + + with _maybe_span( + get_tracer(__name__), + f"StartUpdateWithStartWorkflow:{input.start_workflow_input.workflow}", + add_temporal_spans=self._add_temporal_spans, + attributes=attrs, + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.start_workflow_input.headers = _context_to_headers( + input.start_workflow_input.headers + ) + input.update_workflow_input.headers = _context_to_headers( + input.update_workflow_input.headers + ) + return await super().start_update_with_start_workflow(input) + + +class _TracingActivityInboundInterceptor(temporalio.worker.ActivityInboundInterceptor): + def __init__( + self, + next: temporalio.worker.ActivityInboundInterceptor, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._add_temporal_spans = add_temporal_spans + + async def execute_activity( + self, input: temporalio.worker.ExecuteActivityInput + ) -> Any: + context = _headers_to_context(input.headers) + token = opentelemetry.context.attach(context) + try: + info = temporalio.activity.info() + with _maybe_span( + get_tracer(__name__), + f"RunActivity:{info.activity_type}", + add_temporal_spans=self._add_temporal_spans, + attributes={ + "temporalWorkflowID": info.workflow_id or "", + "temporalRunID": info.workflow_run_id or "", + "temporalActivityID": info.activity_id, + }, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await super().execute_activity(input) + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class _TracingNexusOperationInboundInterceptor( + temporalio.worker.NexusOperationInboundInterceptor +): + def __init__( + self, + next: temporalio.worker.NexusOperationInboundInterceptor, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._add_temporal_spans = add_temporal_spans + + @contextmanager + def _top_level_context(self, headers: Mapping[str, str]) -> Iterator[None]: + context = _nexus_headers_to_context(headers) + token = opentelemetry.context.attach(context) + try: + yield + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + async def execute_nexus_operation_start( + self, input: temporalio.worker.ExecuteNexusOperationStartInput + ) -> ( + nexusrpc.handler.StartOperationResultSync[Any] + | nexusrpc.handler.StartOperationResultAsync + ): + with self._top_level_context(input.ctx.headers): + with _maybe_span( + get_tracer(__name__), + f"RunStartNexusOperationHandler:{input.ctx.service}/{input.ctx.operation}", + add_temporal_spans=self._add_temporal_spans, + attributes={}, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await self.next.execute_nexus_operation_start(input) + + async def execute_nexus_operation_cancel( + self, input: temporalio.worker.ExecuteNexusOperationCancelInput + ) -> None: + with self._top_level_context(input.ctx.headers): + with _maybe_span( + get_tracer(__name__), + f"RunCancelNexusOperationHandler:{input.ctx.service}/{input.ctx.operation}", + add_temporal_spans=self._add_temporal_spans, + attributes={}, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + return await self.next.execute_nexus_operation_cancel(input) + + +class _InputWithHeaders(Protocol): + headers: Mapping[str, temporalio.api.common.v1.Payload] + + +class _TracingWorkflowInboundInterceptor(temporalio.worker.WorkflowInboundInterceptor): + """Tracing interceptor for workflow calls.""" + + _add_temporal_spans: bool = False + + def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None: + """Initialize a tracing workflow interceptor.""" + super().__init__(next) + + def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.init`. + """ + super().init( + _TracingWorkflowOutboundInterceptor(outbound, self._add_temporal_spans) + ) + + @contextmanager + def _workflow_maybe_span(self, name: str) -> Iterator[None]: + info = temporalio.workflow.info() + attributes: dict[str, opentelemetry.util.types.AttributeValue] = { + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.run_id, + } + with _maybe_span( + get_tracer(__name__), + name, + add_temporal_spans=self._add_temporal_spans, + attributes=attributes, + kind=opentelemetry.trace.SpanKind.SERVER, + ): + yield + + async def execute_workflow( + self, input: temporalio.worker.ExecuteWorkflowInput + ) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.execute_workflow`. + """ + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"RunWorkflow:{temporalio.workflow.info().workflow_type}" + ): + return await super().execute_workflow(input) + + async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_signal`. + """ + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleSignal:{input.signal}", + ): + await super().handle_signal(input) + + async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_query`. + """ + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleQuery:{input.query}", + ): + return await super().handle_query(input) + + def handle_update_validator( + self, input: temporalio.worker.HandleUpdateInput + ) -> None: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_update_validator`. + """ + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"ValidateUpdate:{input.update}", + ): + super().handle_update_validator(input) + + async def handle_update_handler( + self, input: temporalio.worker.HandleUpdateInput + ) -> Any: + """Implementation of + :py:meth:`temporalio.worker.WorkflowInboundInterceptor.handle_update_handler`. + """ + with self._top_level_workflow_context(input): + with self._workflow_maybe_span( + f"HandleUpdate:{input.update}", + ): + return await super().handle_update_handler(input) + + @contextmanager + def _top_level_workflow_context(self, input: _InputWithHeaders) -> Iterator[None]: + context = _headers_to_context(input.headers) + token = opentelemetry.context.attach(context) + try: + yield + finally: + if context is opentelemetry.context.get_current(): + opentelemetry.context.detach(token) + + +class _TracingWorkflowOutboundInterceptor( + temporalio.worker.WorkflowOutboundInterceptor +): + def __init__( + self, + next: temporalio.worker.WorkflowOutboundInterceptor, + add_temporal_spans: bool, + ) -> None: + super().__init__(next) + self._add_temporal_spans = add_temporal_spans + + @contextmanager + def _workflow_maybe_span( + self, name: str, kind: opentelemetry.trace.SpanKind + ) -> Iterator[None]: + info = temporalio.workflow.info() + attributes: dict[str, opentelemetry.util.types.AttributeValue] = { + "temporalWorkflowID": info.workflow_id, + "temporalRunID": info.run_id, + } + with _maybe_span( + get_tracer(__name__), + name, + add_temporal_spans=self._add_temporal_spans, + attributes=attributes, + kind=kind, + ): + yield + + def continue_as_new(self, input: temporalio.worker.ContinueAsNewInput) -> NoReturn: + input.headers = _context_to_headers(input.headers) + super().continue_as_new(input) + + async def signal_child_workflow( + self, input: temporalio.worker.SignalChildWorkflowInput + ) -> None: + with self._workflow_maybe_span( + f"SignalChildWorkflow:{input.signal}", + kind=opentelemetry.trace.SpanKind.SERVER, + ): + input.headers = _context_to_headers(input.headers) + await super().signal_child_workflow(input) + + async def signal_external_workflow( + self, input: temporalio.worker.SignalExternalWorkflowInput + ) -> None: + with self._workflow_maybe_span( + f"SignalExternalWorkflow:{input.signal}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + await super().signal_external_workflow(input) + + def start_activity( + self, input: temporalio.worker.StartActivityInput + ) -> temporalio.workflow.ActivityHandle: + with self._workflow_maybe_span( + f"StartActivity:{input.activity}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return super().start_activity(input) + + async def start_child_workflow( + self, input: temporalio.worker.StartChildWorkflowInput + ) -> temporalio.workflow.ChildWorkflowHandle: + with self._workflow_maybe_span( + f"StartChildWorkflow:{input.workflow}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return await super().start_child_workflow(input) + + def start_local_activity( + self, input: temporalio.worker.StartLocalActivityInput + ) -> temporalio.workflow.ActivityHandle: + with self._workflow_maybe_span( + f"StartActivity:{input.activity}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_headers(input.headers) + return super().start_local_activity(input) + + async def start_nexus_operation( + self, input: temporalio.worker.StartNexusOperationInput[Any, Any] + ) -> temporalio.workflow.NexusOperationHandle[Any]: + with self._workflow_maybe_span( + f"StartNexusOperation:{input.service}/{input.operation_name}", + kind=opentelemetry.trace.SpanKind.CLIENT, + ): + input.headers = _context_to_nexus_headers(input.headers or {}) + return await super().start_nexus_operation(input) diff --git a/temporalio/contrib/opentelemetry/_plugin.py b/temporalio/contrib/opentelemetry/_plugin.py new file mode 100644 index 000000000..c88474e77 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_plugin.py @@ -0,0 +1,53 @@ +import dataclasses + +from temporalio.contrib.opentelemetry import OpenTelemetryInterceptor +from temporalio.plugin import SimplePlugin +from temporalio.worker import WorkflowRunner +from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner + + +class OpenTelemetryPlugin(SimplePlugin): + """OpenTelemetry plugin for Temporal SDK. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This plugin integrates OpenTelemetry tracing with the Temporal SDK, providing + automatic span creation for workflows, activities, and other Temporal operations. + It uses the new OpenTelemetryInterceptor implementation. + + Unlike the prior TracingInterceptor, this allows for accurate duration spans and parenting inside a workflow + with temporalio.contrib.opentelemetry.workflow.tracer() + + Your tracer provider should be created with `create_tracer_provider` for it to be used within a Temporal worker. + """ + + def __init__(self, *, add_temporal_spans: bool = False): + """Initialize the OpenTelemetry plugin. + + Args: + add_temporal_spans: Whether to add additional Temporal-specific spans + for operations like StartWorkflow, RunWorkflow, etc. + """ + interceptors = [OpenTelemetryInterceptor(add_temporal_spans)] + + def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner: + if not runner: + raise ValueError("No WorkflowRunner provided to the OpenAI plugin.") + + # If in sandbox, add additional passthrough + if isinstance(runner, SandboxedWorkflowRunner): + return dataclasses.replace( + runner, + restrictions=runner.restrictions.with_passthrough_modules( + "opentelemetry" + ), + ) + return runner + + super().__init__( + "OpenTelemetryPlugin", + client_interceptors=interceptors, + workflow_runner=workflow_runner, + ) diff --git a/temporalio/contrib/opentelemetry/_tracer_provider.py b/temporalio/contrib/opentelemetry/_tracer_provider.py new file mode 100644 index 000000000..4353bc146 --- /dev/null +++ b/temporalio/contrib/opentelemetry/_tracer_provider.py @@ -0,0 +1,271 @@ +from collections.abc import Iterator, Mapping, Sequence + +import opentelemetry.sdk.trace as trace_sdk +from opentelemetry.context import Context +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import ( + ConcurrentMultiSpanProcessor, + SpanLimits, + SynchronousMultiSpanProcessor, + sampling, +) +from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator +from opentelemetry.trace import ( + Link, + Span, + SpanContext, + SpanKind, + Status, + StatusCode, + Tracer, + TracerProvider, + use_span, +) +from opentelemetry.util import types +from opentelemetry.util._decorator import _agnosticcontextmanager + +from temporalio import workflow +from temporalio.contrib.opentelemetry._id_generator import TemporalIdGenerator + + +class _ReplaySafeSpan(Span): + def __init__(self, span: Span): + self._exception: BaseException | None = None + self._span = span + + def end(self, end_time: int | None = None) -> None: + if workflow.in_workflow() and workflow.unsafe.is_replaying_history_events(): + # Skip ending spans during workflow replay to avoid duplicate telemetry + return + + if ( + workflow.in_workflow() + and self._exception is not None + and not workflow.is_failure_exception(self._exception) + ): + # Skip ending spans with workflow task failures. Otherwise, each failure will create its own span + # This may still occur for spans which were completed during failed workflow tasks. + return + + self._span.end(end_time=end_time) + + def get_span_context(self) -> SpanContext: + return self._span.get_span_context() + + def set_attributes(self, attributes: Mapping[str, types.AttributeValue]) -> None: + self._span.set_attributes(attributes) + + def set_attribute(self, key: str, value: types.AttributeValue) -> None: + self._span.set_attribute(key, value) + + def add_event( + self, + name: str, + attributes: types.Attributes = None, + timestamp: int | None = None, + ) -> None: + self._span.add_event(name, attributes, timestamp) + + def update_name(self, name: str) -> None: + self._span.update_name(name) + + def is_recording(self) -> bool: + return self._span.is_recording() + + def set_status( + self, status: Status | StatusCode, description: str | None = None + ) -> None: + self._status = status + self._span.set_status(status, description) + + def record_exception( + self, + exception: BaseException, + attributes: types.Attributes = None, + timestamp: int | None = None, + escaped: bool = False, + ) -> None: + self._exception = exception + self._span.record_exception(exception, attributes, timestamp, escaped) + + +class _ReplaySafeTracer(Tracer): # type: ignore[reportUnusedClass] # Used outside file + def __init__(self, tracer: Tracer): + self._tracer = tracer + + def start_span( + self, + name: str, + context: Context | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + ) -> "Span": + span = self._tracer.start_span( + name, + context, + kind, + attributes, + links, + start_time or workflow.time_ns(), + record_exception, + set_status_on_exception, + ) + return _ReplaySafeSpan(span) + + @_agnosticcontextmanager + def start_as_current_span( + self, + name: str, + context: Context | None = None, + kind: SpanKind = SpanKind.INTERNAL, + attributes: types.Attributes = None, + links: Sequence[Link] | None = None, + start_time: int | None = None, + record_exception: bool = True, + set_status_on_exception: bool = True, + end_on_exit: bool = True, + ) -> Iterator["Span"]: + if workflow.in_workflow() and workflow.unsafe.is_replaying_history_events(): + start_time = start_time or workflow.time_ns() + span = self._tracer.start_span( + name, + context, + kind, + attributes, + links, + start_time, + record_exception, + set_status_on_exception, + ) + span = _ReplaySafeSpan(span) + with use_span( + span, + end_on_exit=end_on_exit, + record_exception=record_exception, + set_status_on_exception=set_status_on_exception, + ) as span: + yield span + + +class ReplaySafeTracerProvider(TracerProvider): + """A tracer provider that is safe for use during workflow replay. + + .. warning:: + This class is experimental and may change in future versions. + Use with caution in production environments. + + This tracer provider wraps an OpenTelemetry TracerProvider and ensures + that telemetry operations are safe during workflow replay by using + replay-safe spans and tracers. + """ + + def __init__(self, tracer_provider: trace_sdk.TracerProvider): + """Initialize the replay-safe tracer provider. + + Args: + tracer_provider: The underlying OpenTelemetry TracerProvider to wrap. + Must use a _TemporalIdGenerator for replay safety. + + Raises: + ValueError: If the tracer provider doesn't use a _TemporalIdGenerator. + """ + if not isinstance(tracer_provider.id_generator, TemporalIdGenerator): + raise ValueError( + "ReplaySafeTracerProvider should only be used with a TemporalIdGenerator for replay safety. The given TracerProvider doesnt use one." + ) + self._tracer_provider = tracer_provider + + def add_span_processor(self, span_processor: trace_sdk.SpanProcessor) -> None: + """Add a span processor to the underlying tracer provider. + + Args: + span_processor: The span processor to add. + """ + self._tracer_provider.add_span_processor(span_processor) + + def shutdown(self) -> None: + """Shutdown the underlying tracer provider.""" + self._tracer_provider.shutdown() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + """Force flush the underlying tracer provider. + + Args: + timeout_millis: Timeout in milliseconds. + + Returns: + True if flush was successful, False otherwise. + """ + return self._tracer_provider.force_flush(timeout_millis) + + def get_tracer( + self, + instrumenting_module_name: str, + instrumenting_library_version: str | None = None, + schema_url: str | None = None, + attributes: types.Attributes | None = None, + ) -> Tracer: + """Get a replay-safe tracer from the underlying provider. + + Args: + instrumenting_module_name: The name of the instrumenting module. + instrumenting_library_version: The version of the instrumenting library. + schema_url: The schema URL for the tracer. + attributes: Additional attributes for the tracer. + + Returns: + A replay-safe tracer instance. + """ + tracer = self._tracer_provider.get_tracer( + instrumenting_module_name, + instrumenting_library_version, + schema_url, + attributes, + ) + return _ReplaySafeTracer(tracer) + + +def create_tracer_provider( + sampler: sampling.Sampler | None = None, + resource: Resource | None = None, + shutdown_on_exit: bool = True, + active_span_processor: SynchronousMultiSpanProcessor + | ConcurrentMultiSpanProcessor + | None = None, + id_generator: IdGenerator | None = None, + span_limits: SpanLimits | None = None, +) -> ReplaySafeTracerProvider: + """Initialize a replay-safe tracer provider. + + .. warning:: + This function is experimental and may change in future versions. + Use with caution in production environments. + + Creates a new TracerProvider with a TemporalIdGenerator for replay safety + and wraps it in a ReplaySafeTracerProvider. + + Args: + sampler: The sampler to use for sampling spans. + resource: The resource to associate with the tracer provider. + shutdown_on_exit: Whether to shutdown the provider on exit. + active_span_processor: The active span processor to use. + id_generator: The ID generator to wrap with TemporalIdGenerator. + span_limits: The span limits to apply. + + Returns: + A replay-safe tracer provider instance. + """ + generator = TemporalIdGenerator(id_generator or RandomIdGenerator()) + provider = trace_sdk.TracerProvider( + sampler=sampler, + resource=resource, + shutdown_on_exit=shutdown_on_exit, + active_span_processor=active_span_processor, + span_limits=span_limits, + id_generator=generator, + ) + return ReplaySafeTracerProvider(provider) diff --git a/temporalio/contrib/opentelemetry/workflow.py b/temporalio/contrib/opentelemetry/workflow.py new file mode 100644 index 000000000..299e72b24 --- /dev/null +++ b/temporalio/contrib/opentelemetry/workflow.py @@ -0,0 +1,53 @@ +"""OpenTelemetry workflow utilities for Temporal SDK. + +This module provides workflow-safe OpenTelemetry span creation and context +management utilities for use within Temporal workflows. All functions in +this module are designed to work correctly during workflow replay. +""" + +from __future__ import annotations + +import warnings + +import opentelemetry.util.types +from opentelemetry.trace import ( + get_tracer, +) + +from temporalio.contrib.opentelemetry import TracingWorkflowInboundInterceptor + + +def completed_span( + name: str, + *, + attributes: opentelemetry.util.types.Attributes = None, + exception: Exception | None = None, +) -> None: + """Create and end an OpenTelemetry span. + + Note, this will only create and record when the workflow is not + replaying and if there is a current span (meaning the client started a + span and this interceptor is configured on the worker and the span is on + the context). + + To create a long-running span or to create a span that actually spans other code use OpenTelemetryPlugin and tracer(). + + Args: + name: Name of the span. + attributes: Attributes to set on the span if any. Workflow ID and + run ID are automatically added. + exception: Optional exception to record on the span. + """ + if interceptor := TracingWorkflowInboundInterceptor._from_context(): + interceptor._completed_span( + name, additional_attributes=attributes, exception=exception + ) + else: + warnings.warn( + "When using OpenTelemetryPlugin, you should prefer using opentelemetry directly.", + DeprecationWarning, + ) + span = get_tracer(__name__).start_span(name, attributes=attributes) + if exception: + span.record_exception(exception) + span.end() diff --git a/temporalio/plugin.py b/temporalio/plugin.py index db917e337..b98ec365b 100644 --- a/temporalio/plugin.py +++ b/temporalio/plugin.py @@ -150,11 +150,36 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: if workflow_runner: config["workflow_runner"] = workflow_runner - interceptors = _resolve_append_parameter( - config.get("interceptors"), self.worker_interceptors + interceptors = list( + _resolve_append_parameter( + config.get("interceptors"), self.worker_interceptors + ) + or [] ) - if interceptors is not None: - config["interceptors"] = interceptors + + # Only propagate client interceptors if they are provided as a simple list (not callable) + if self.client_interceptors is not None and not callable( + self.client_interceptors + ): + client_worker_interceptors = [ + interceptor + for interceptor in self.client_interceptors + if isinstance(interceptor, temporalio.worker.Interceptor) + ] + for interceptor in client_worker_interceptors: + if interceptor not in interceptors: + # Check if interceptor is already in client's interceptors to avoid duplication + client_config = config.get("client") + if client_config is not None: + client_interceptors_list = client_config.config( + active_config=True + ).get("interceptors", []) + if interceptor not in client_interceptors_list: + interceptors.append(interceptor) + else: + interceptors.append(interceptor) + + config["interceptors"] = interceptors failure_exception_types = _resolve_append_parameter( config.get("workflow_failure_exception_types"), diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index a10ae0fa6..26169f257 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -1233,6 +1233,9 @@ def workflow_is_replaying(self) -> bool: def workflow_is_replaying_history_events(self) -> bool: return self._is_replaying and not self._in_query_or_validator + def workflow_is_read_only(self) -> bool: + return self._read_only + def workflow_memo(self) -> Mapping[str, Any]: if self._untyped_converted_memo is None: self._untyped_converted_memo = { diff --git a/temporalio/workflow.py b/temporalio/workflow.py index fb1753ed8..441dac6be 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -741,6 +741,9 @@ def workflow_is_replaying(self) -> bool: ... @abstractmethod def workflow_is_replaying_history_events(self) -> bool: ... + @abstractmethod + def workflow_is_read_only(self) -> bool: ... + @abstractmethod def workflow_memo(self) -> Mapping[str, Any]: ... @@ -1516,6 +1519,18 @@ def is_replaying_history_events() -> bool: """ return _Runtime.current().workflow_is_replaying_history_events() + @staticmethod + def is_read_only() -> bool: + """Whether the workflow is currently in read-only mode. + + Read-only mode occurs during queries and update validators where + side effects are not allowed. + + Returns: + True if the workflow is in read-only mode, False otherwise. + """ + return _Runtime.current().workflow_is_read_only() + @staticmethod def is_sandbox_unrestricted() -> bool: """Whether the current block of code is not restricted via sandbox. diff --git a/tests/contrib/test_opentelemetry.py b/tests/contrib/opentelemetry/test_opentelemetry.py similarity index 100% rename from tests/contrib/test_opentelemetry.py rename to tests/contrib/opentelemetry/test_opentelemetry.py diff --git a/tests/contrib/opentelemetry/test_opentelemetry_plugin.py b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py new file mode 100644 index 000000000..64073cf9c --- /dev/null +++ b/tests/contrib/opentelemetry/test_opentelemetry_plugin.py @@ -0,0 +1,576 @@ +import logging +import uuid +from datetime import timedelta +from typing import Any + +import nexusrpc +import opentelemetry.trace +import pytest +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.trace import ( + get_tracer, +) +from opentelemetry.util._once import Once + +import temporalio.contrib.opentelemetry.workflow +from temporalio import activity, nexus, workflow +from temporalio.client import Client, WorkflowFailureError +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider +from temporalio.exceptions import ApplicationError +from temporalio.testing import WorkflowEnvironment + +# Import the dump_spans function from the original opentelemetry test +from tests.contrib.opentelemetry.test_opentelemetry import dump_spans +from tests.helpers import new_worker +from tests.helpers.nexus import create_nexus_endpoint, make_nexus_endpoint_name + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def reset_otel_tracer_provider(): + """Reset OpenTelemetry tracer provider state to allow multiple test runs.""" + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None + yield + opentelemetry.trace._TRACER_PROVIDER_SET_ONCE = Once() + opentelemetry.trace._TRACER_PROVIDER = None + + +@activity.defn +async def simple_no_context_activity() -> str: + with get_tracer(__name__).start_as_current_span("Activity"): + pass + return "success" + + +@workflow.defn +class SimpleNexusWorkflow: + @workflow.run + async def run(self, input: str) -> str: + return f"nexus-result-{input}" + + +@nexusrpc.handler.service_handler +class ComprehensiveNexusService: + @nexus.workflow_run_operation + async def test_operation( + self, ctx: nexus.WorkflowRunOperationContext, input: str + ) -> nexus.WorkflowHandle[str]: + return await ctx.start_workflow( + SimpleNexusWorkflow.run, + input, + id=f"nexus-wf-{ctx.request_id}", + ) + + +@workflow.defn +class BasicTraceWorkflow: + @workflow.run + async def run(self): + tracer = get_tracer(__name__) + temporalio.contrib.opentelemetry.workflow.completed_span("Completed Span") + with tracer.start_as_current_span("Hello World"): + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + span = tracer.start_span("Not context") + with tracer.start_as_current_span("Inner"): + await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + span.end() + return + + +async def test_otel_tracing_basic(client: Client, reset_otel_tracer_provider: Any): # type: ignore[reportUnusedParameter] + exporter = InMemorySpanExporter() + provider = create_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin()] + new_client = Client(**new_config) + + async with new_worker( + new_client, + BasicTraceWorkflow, + activities=[simple_no_context_activity], + max_cached_workflows=0, + ) as worker: + tracer = get_tracer(__name__) + + with tracer.start_as_current_span("Research workflow"): + workflow_handle = await new_client.start_workflow( + BasicTraceWorkflow.run, + id=f"research-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + await workflow_handle.result() + + spans = exporter.get_finished_spans() + + expected_hierarchy = [ + "Research workflow", + " Completed Span", + " Hello World", + " Activity", + " Activity", + " Inner", + " Activity", + " Not context", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +@workflow.defn +class ComprehensiveWorkflow: + def __init__(self) -> None: + self._signal_count = 0 + self._update_completed = False + self._nexus_result: str = "" + + @workflow.run + async def run(self, actions: list[str]) -> dict[str, str]: + results = {} + tracer = get_tracer(__name__) + with tracer.start_as_current_span("MainWorkflow"): + for action in actions: + if action == "activity": + with tracer.start_as_current_span("ActivitySection"): + result = await workflow.execute_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + results["activity"] = result + + elif action == "local_activity": + with tracer.start_as_current_span("LocalActivitySection"): + result = await workflow.execute_local_activity( + simple_no_context_activity, + start_to_close_timeout=timedelta(seconds=10), + ) + results["local_activity"] = result + + elif action == "child_workflow": + with tracer.start_as_current_span("ChildWorkflowSection"): + child_handle = await workflow.start_child_workflow( + BasicTraceWorkflow.run, + id=f"child-{workflow.info().workflow_id}", + ) + + await child_handle + results["child_workflow"] = "completed" + + elif action == "timer": + with tracer.start_as_current_span("TimerSection"): + await workflow.sleep(0.01) + results["timer"] = "completed" + + elif action == "wait_signal": + with tracer.start_as_current_span("WaitSignalSection"): + await workflow.wait_condition(lambda: self._signal_count > 0) + results["wait_signal"] = ( + f"received_{self._signal_count}_signals" + ) + + elif action == "wait_update": + with tracer.start_as_current_span("WaitUpdateSection"): + await workflow.wait_condition(lambda: self._update_completed) + results["wait_update"] = "update_received" + + elif action == "nexus": + with tracer.start_as_current_span("NexusSection"): + nexus_client = workflow.create_nexus_client( + endpoint=make_nexus_endpoint_name( + workflow.info().task_queue + ), + service=ComprehensiveNexusService, + ) + nexus_handle = await nexus_client.start_operation( + operation=ComprehensiveNexusService.test_operation, + input="test-input", + ) + nexus_result = await nexus_handle + results["nexus"] = nexus_result + + elif action == "continue_as_new": + with tracer.start_as_current_span("ContinueAsNewSection"): + if ( + len(results) > 0 + ): # Only continue as new if we've done some work + workflow.continue_as_new( + [] + ) # Empty actions to finish quickly + results["continue_as_new"] = "prepared" + + return results + + @workflow.query + def get_status(self) -> dict[str, Any]: + return { + "signal_count": self._signal_count, + "update_completed": self._update_completed, + } + + @workflow.signal + def notify(self, message: str) -> None: # type: ignore[reportUnusedParameter] + self._signal_count += 1 + + @workflow.update + def update_status(self, status: str) -> str: + self._update_completed = True + return f"updated_to_{status}" + + @update_status.validator + def validate_update_status(self, status: str) -> None: + if not status: + raise ValueError("Status cannot be empty") + + +async def test_opentelemetry_comprehensive_tracing( + client: Client, + env: WorkflowEnvironment, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + """Test OpenTelemetry v2 integration across all workflow operations.""" + if env.supports_time_skipping: + pytest.skip("Fails on java test server.") + + exporter = InMemorySpanExporter() + provider = create_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)] + new_client = Client(**new_config) + + async with new_worker( + new_client, + ComprehensiveWorkflow, + BasicTraceWorkflow, # For child workflow + SimpleNexusWorkflow, # For Nexus operation + activities=[simple_no_context_activity], + nexus_service_handlers=[ComprehensiveNexusService()], + max_cached_workflows=0, + ) as worker: + # Create Nexus endpoint for this task queue + await create_nexus_endpoint(worker.task_queue, new_client) + + with get_tracer(__name__).start_as_current_span("ComprehensiveTest") as span: + span.set_attribute("test.type", "comprehensive") + + # Start workflow with various actions + workflow_handle = await new_client.start_workflow( + ComprehensiveWorkflow.run, + [ + "activity", + "local_activity", + "child_workflow", + "timer", + "nexus", + "wait_signal", + "wait_update", + ], + id=f"comprehensive-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + + logger.info(f"Comprehensive workflow query") + + # Test query + status = await workflow_handle.query(ComprehensiveWorkflow.get_status) + assert status["signal_count"] == 0 + + logger.info(f"Comprehensive workflow signal") + + # Test signal + await workflow_handle.signal(ComprehensiveWorkflow.notify, "test-signal-1") + await workflow_handle.signal(ComprehensiveWorkflow.notify, "test-signal-2") + + logger.info(f"Comprehensive workflow update") + + # Test update + update_result = await workflow_handle.execute_update( + ComprehensiveWorkflow.update_status, "active" + ) + assert update_result == "updated_to_active" + + logger.info(f"Comprehensive workflow get result") + + # Get final result + result = await workflow_handle.result() + + # Verify results + expected_keys = { + "activity", + "local_activity", + "child_workflow", + "timer", + "nexus", + "wait_signal", + "wait_update", + } + assert all(key in result for key in expected_keys) + assert result["activity"] == "success" + assert result["local_activity"] == "success" + assert result["child_workflow"] == "completed" + assert result["timer"] == "completed" + assert result["nexus"] == "nexus-result-test-input" + assert result["wait_signal"] == "received_2_signals" + assert result["wait_update"] == "update_received" + + spans = exporter.get_finished_spans() + + # Note: Even though we call signal twice, dump_spans() deduplicates signal spans + # as they "can duplicate in rare situations" according to the original test + + expected_hierarchy = [ + "ComprehensiveTest", + " StartWorkflow:ComprehensiveWorkflow", + " RunWorkflow:ComprehensiveWorkflow", + " MainWorkflow", + " ActivitySection", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " LocalActivitySection", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " ChildWorkflowSection", + " StartChildWorkflow:BasicTraceWorkflow", + " RunWorkflow:BasicTraceWorkflow", + " Completed Span", + " Hello World", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Inner", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Not context", + " TimerSection", + " NexusSection", + " StartNexusOperation:ComprehensiveNexusService/test_operation", + " RunStartNexusOperationHandler:ComprehensiveNexusService/test_operation", + " StartWorkflow:SimpleNexusWorkflow", + " RunWorkflow:SimpleNexusWorkflow", + " WaitSignalSection", + " WaitUpdateSection", + " QueryWorkflow:get_status", + " HandleQuery:get_status", + " SignalWorkflow:notify", + " HandleSignal:notify", + " StartWorkflowUpdate:update_status", + " ValidateUpdate:update_status", + " HandleUpdate:update_status", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +async def test_otel_tracing_with_added_spans( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + exporter = InMemorySpanExporter() + provider = create_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + plugin = OpenTelemetryPlugin(add_temporal_spans=True) + new_config = client.config() + new_config["plugins"] = [plugin] + new_client = Client(**new_config) + + async with new_worker( + new_client, + BasicTraceWorkflow, + activities=[simple_no_context_activity], + max_cached_workflows=0, + ) as worker: + with get_tracer(__name__).start_as_current_span("Research workflow"): + workflow_handle = await new_client.start_workflow( + BasicTraceWorkflow.run, + id=f"research-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=120), + ) + await workflow_handle.result() + + spans = exporter.get_finished_spans() + + expected_hierarchy = [ + "Research workflow", + " StartWorkflow:BasicTraceWorkflow", + " RunWorkflow:BasicTraceWorkflow", + " Completed Span", + " Hello World", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Inner", + " StartActivity:simple_no_context_activity", + " RunActivity:simple_no_context_activity", + " Activity", + " Not context", + ] + + # Verify the span hierarchy matches expectations + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +task_fail_once_workflow_has_failed = False + + +@workflow.defn(sandboxed=False) +class FailingTaskWorkflow: + @workflow.run + async def run(self): + tracer = get_tracer(__name__) + with tracer.start_as_current_span("FailingWorkflowSpan"): + with tracer.start_as_current_span("FailingWorkflow CompletedSpan"): + pass + global task_fail_once_workflow_has_failed + if not task_fail_once_workflow_has_failed: + task_fail_once_workflow_has_failed = True + raise RuntimeError("Intentional workflow task failure") + task_fail_once_workflow_has_failed = False + + return + + +async def test_otel_tracing_workflow_task_failure( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + """Test OpenTelemetry behavior when a workflow task fails.""" + exporter = InMemorySpanExporter() + provider = create_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)] + new_client = Client(**new_config) + + async with new_worker( + new_client, + FailingTaskWorkflow, + max_cached_workflows=0, + ) as worker: + with get_tracer(__name__).start_as_current_span("FailingWorkflowTest"): + workflow_handle = await new_client.start_workflow( + FailingTaskWorkflow.run, + id=f"failing-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=30), + ) + + await workflow_handle.result() + + spans = exporter.get_finished_spans() + + # Verify the span hierarchy includes the failure, but only once + # Spans which completed during the failed task will duplicate + expected_hierarchy = [ + "FailingWorkflowTest", + " StartWorkflow:FailingTaskWorkflow", + " RunWorkflow:FailingTaskWorkflow", + " FailingWorkflowSpan", + " FailingWorkflow CompletedSpan", + " FailingWorkflow CompletedSpan", + ] + + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" + + +@workflow.defn +class FailingWorkflow: + @workflow.run + async def run(self): + tracer = get_tracer(__name__) + with tracer.start_as_current_span("FailingWorkflowSpan"): + raise ApplicationError("Intentional workflow failure", non_retryable=True) + + +async def test_otel_tracing_workflow_failure( + client: Client, + reset_otel_tracer_provider: Any, # type: ignore[reportUnusedParameter] +): + """Test OpenTelemetry behavior when a workflow task fails.""" + exporter = InMemorySpanExporter() + provider = create_tracer_provider() + provider.add_span_processor(SimpleSpanProcessor(exporter)) + opentelemetry.trace.set_tracer_provider(provider) + + new_config = client.config() + new_config["plugins"] = [OpenTelemetryPlugin(add_temporal_spans=True)] + new_client = Client(**new_config) + + async with new_worker( + new_client, + FailingWorkflow, + max_cached_workflows=0, + ) as worker: + with get_tracer(__name__).start_as_current_span("FailingWorkflowTest"): + workflow_handle = await new_client.start_workflow( + FailingWorkflow.run, + id=f"failing-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + execution_timeout=timedelta(seconds=30), + ) + + with pytest.raises(WorkflowFailureError): + await workflow_handle.result() + + spans = exporter.get_finished_spans() + + # Verify the span hierarchy includes the failure when it fails the whole workflow + expected_hierarchy = [ + "FailingWorkflowTest", + " StartWorkflow:FailingWorkflow", + " RunWorkflow:FailingWorkflow", + " FailingWorkflowSpan", + ] + + actual_hierarchy = dump_spans(spans, with_attributes=False) + assert ( + actual_hierarchy == expected_hierarchy + ), f"Span hierarchy mismatch.\nExpected:\n{expected_hierarchy}\nActual:\n{actual_hierarchy}" diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 399d16c7e..f6d0d224a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -387,3 +387,232 @@ async def test_medium_plugin(client: Client) -> None: ) task_queue = worker.config(active_config=True).get("task_queue") assert task_queue is not None and task_queue.startswith("override") + + +class CombinedClientWorkerInterceptor( + temporalio.client.Interceptor, temporalio.worker.Interceptor +): + """Test interceptor that can be used as both client and worker interceptor with execution counting.""" + + def __init__(self): + super().__init__() + self.client_intercepted = False + self.worker_intercepted = False + self.call_count = {"execute_workflow": 0} + + def intercept_client( + self, next: temporalio.client.OutboundInterceptor + ) -> temporalio.client.OutboundInterceptor: + self.client_intercepted = True + return super().intercept_client(next) + + def intercept_activity( + self, next: temporalio.worker.ActivityInboundInterceptor + ) -> temporalio.worker.ActivityInboundInterceptor: + self.worker_intercepted = True + return super().intercept_activity(next) + + def workflow_interceptor_class( + self, input: temporalio.worker.WorkflowInterceptorClassInput + ) -> type[temporalio.worker.WorkflowInboundInterceptor] | None: + # This method gets called when the worker is configured with workflows + # Mark that worker interceptor was used + self.worker_intercepted = True + + # Return counting interceptor class + call_count = self.call_count + + class CountingWorkflowInterceptor(temporalio.worker.WorkflowInboundInterceptor): + async def execute_workflow( + self, input: temporalio.worker.ExecuteWorkflowInput + ): + call_count["execute_workflow"] += 1 + return await super().execute_workflow(input) + + return CountingWorkflowInterceptor + + +async def test_simple_plugin_worker_interceptor_only_used_on_worker( + client: Client, +) -> None: + """Test that when a combined client/worker interceptor is provided by SimplePlugin + to client_interceptors, and the plugin is only used on a worker (not on the client + used to create that worker), the worker interceptor functionality is still provided.""" + + interceptor = CombinedClientWorkerInterceptor() + + # Create SimplePlugin that provides the combined interceptor as client_interceptors + plugin = SimplePlugin( + "TestCombinedPlugin", + client_interceptors=[interceptor], + ) + + # Create worker with the plugin (but don't add plugin to client) + worker = Worker( + client, + task_queue="queue" + str(uuid.uuid4()), + activities=[never_run_activity], + workflows=[ + HelloWorkflow + ], # Add workflows to trigger workflow_interceptor_class + plugins=[plugin], + ) + + # Worker creation triggers plugin configuration + assert worker is not None + + # The interceptor should NOT have been used for client interception + # since the plugin was not added to the client + assert ( + not interceptor.client_intercepted + ), "Client interceptor should not have been used" + + # The interceptor SHOULD have been used for worker interception + # even though it was specified in client_interceptors + assert interceptor.worker_intercepted, "Worker interceptor should have been used" + + +async def test_simple_plugin_interceptor_duplication_when_used_on_client_and_worker( + client: Client, +) -> None: + """Test that when a combined client/worker interceptor is provided by SimplePlugin + to client_interceptors, and the plugin is used on both client and worker, + the interceptor is not duplicated in the worker.""" + + interceptor = CombinedClientWorkerInterceptor() + + # Create SimplePlugin that provides the combined interceptor as client_interceptors + plugin = SimplePlugin( + "TestCombinedPlugin", + client_interceptors=[interceptor], + ) + + # Add plugin to client first + config = client.config() + config["plugins"] = [plugin] + new_client = Client(**config) + + # Verify client interceptor was used + assert interceptor.client_intercepted, "Client interceptor should have been used" + + # Reset the worker intercepted flag to test worker behavior + interceptor.worker_intercepted = False + + # Create worker with the same plugin-enabled client + worker = Worker( + new_client, + task_queue="queue" + str(uuid.uuid4()), + activities=[never_run_activity], + workflows=[HelloWorkflow], + ) + + # The worker interceptor functionality should still work + # (regardless of whether it comes from client propagation or worker config) + assert interceptor.worker_intercepted, "Worker interceptor should have been used" + + # Test execution-level duplication by running a workflow + async with new_worker( + new_client, + HelloWorkflow, + max_cached_workflows=0, + ) as worker: + # Start and complete a workflow + handle = await new_client.start_workflow( + HelloWorkflow.run, + "test", + id=f"counting-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + result = await handle.result() + assert result == "Hello, test!" + + # The workflow interceptor should only be called ONCE, not twice + assert ( + interceptor.call_count["execute_workflow"] == 1 + ), f"Expected execute_workflow to be called once, but was called {interceptor.call_count['execute_workflow']} times. This indicates interceptor duplication in execution." + + +async def test_simple_plugin_no_duplication_when_interceptor_in_both_client_and_worker_params( + client: Client, +) -> None: + """Test that when the same interceptor is provided to both client_interceptors + and worker_interceptors in a SimplePlugin, it doesn't get duplicated.""" + + interceptor = CombinedClientWorkerInterceptor() + + # Create SimplePlugin that provides the same interceptor to both client and worker + plugin = SimplePlugin( + "TestCombinedPlugin", + client_interceptors=[interceptor], + worker_interceptors=[interceptor], # Same interceptor in both places + ) + + # Create worker with plugin (not on client) + worker = Worker( + client, + task_queue="queue" + str(uuid.uuid4()), + activities=[never_run_activity], + workflows=[HelloWorkflow], + plugins=[plugin], + ) + + # The worker interceptor functionality should work + assert interceptor.worker_intercepted, "Worker interceptor should have been used" + + # Test execution-level duplication by running a workflow + async with worker: + # Start and complete a workflow + handle = await client.start_workflow( + HelloWorkflow.run, + "test", + id=f"counting-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + result = await handle.result() + assert result == "Hello, test!" + + # The workflow interceptor should only be called ONCE, not twice + assert ( + interceptor.call_count["execute_workflow"] == 1 + ), f"Expected execute_workflow to be called once, but was called {interceptor.call_count['execute_workflow']} times. This indicates interceptor duplication in execution." + + +async def test_simple_plugin_no_duplication_in_interceptor_chain( + client: Client, +) -> None: + """Test that interceptors don't get duplicated in the actual interceptor chain execution. + This catches the specific OpenTelemetry issue where the same interceptor method gets called twice.""" + + interceptor = CombinedClientWorkerInterceptor() + + # Create SimplePlugin that provides the combined interceptor as client_interceptors only + plugin = SimplePlugin( + "CountingPlugin", + client_interceptors=[interceptor], + ) + + # Add plugin to client (like OpenTelemetryPlugin does) + config = client.config() + config["plugins"] = [plugin] + new_client = Client(**config) + + # Create worker with the plugin-enabled client (plugin propagates from client) + async with new_worker( + new_client, + HelloWorkflow, + max_cached_workflows=0, + ) as worker: + # Start and complete a workflow + handle = await new_client.start_workflow( + HelloWorkflow.run, + "test", + id=f"counting-workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + result = await handle.result() + assert result == "Hello, test!" + + # The workflow interceptor should only be called ONCE, not twice + assert ( + interceptor.call_count["execute_workflow"] == 1 + ), f"Expected execute_workflow to be called once, but was called {interceptor.call_count['execute_workflow']} times. This indicates interceptor duplication in the chain."