Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
6214033
Add OpenTelemetry v2 interceptor with enhanced features
tconley1428 Jan 20, 2026
b748c4b
Enhance OpenTelemetry v2 integration with comprehensive testing and l…
tconley1428 Feb 3, 2026
066438b
Add debugging to comprehensive test
tconley1428 Feb 4, 2026
68f17fd
Fix formatting
tconley1428 Feb 4, 2026
fa25bbc
Skip test on timeskipping
tconley1428 Feb 4, 2026
809b22d
Merge opentelemetry contribs
tconley1428 Feb 5, 2026
0518e77
Switch to batch processor
tconley1428 Feb 5, 2026
15e77b4
Merge remote-tracking branch 'origin/main' into opentelemetryv2-impro…
tconley1428 Feb 5, 2026
a838aeb
Use new workflow random functionality for id generation
tconley1428 Feb 5, 2026
0628859
Update to remove global state modification from plugin, and span proc…
tconley1428 Feb 6, 2026
94eda7a
Remove inaccurate comment
tconley1428 Feb 6, 2026
ae1e7f5
Move otel tests
tconley1428 Feb 6, 2026
3678a18
Fix test import
tconley1428 Feb 9, 2026
b93af2d
PR Feedback
tconley1428 Feb 10, 2026
9fc296d
Rely on the global and pass it through. All uses of a global tracer p…
tconley1428 Feb 10, 2026
1e675ef
Merge remote-tracking branch 'origin/main' into opentelemetryv2-impro…
tconley1428 Feb 11, 2026
92aa182
Fix rebase issue with standalone activities context
tconley1428 Feb 11, 2026
ce3e421
Clean up some unused code paths
tconley1428 Feb 11, 2026
9db654b
Change tracerprovider return type
tconley1428 Feb 11, 2026
f4117e5
Remove is read only
tconley1428 Feb 11, 2026
8416666
Clean up test prints
tconley1428 Feb 11, 2026
de676c5
Return ReplaySafeTracerProvider
tconley1428 Feb 11, 2026
bc8eb16
Fix lint
tconley1428 Feb 11, 2026
f01c03d
Add readme
tconley1428 Feb 11, 2026
d25cb3f
Debugging
tconley1428 Feb 11, 2026
e0f26a2
More debugging
tconley1428 Feb 11, 2026
de5a329
More debugging logs
tconley1428 Feb 11, 2026
efda2f0
Change debugging
tconley1428 Feb 11, 2026
d6e9442
Switch to is_read_only because updatevalidators are not technically d…
tconley1428 Feb 11, 2026
af2d637
Linting
tconley1428 Feb 11, 2026
d1d6630
Remove interceptor tracer members
tconley1428 Feb 11, 2026
d81d859
Change plugin interceptor logic to propagate client/worker combined i…
tconley1428 Feb 12, 2026
0cb5267
Pass through all of otel
tconley1428 Feb 12, 2026
222c503
Remove debug file
tconley1428 Feb 12, 2026
9a1b79d
Merge branch 'main' into opentelemetryv2-improvements
tconley1428 Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions temporalio/contrib/opentelemetry/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
# OpenTelemetry Integration for Temporal Python SDK

This package provides OpenTelemetry tracing integration for Temporal workflows, activities, and other operations. It includes automatic span creation and propagation for distributed tracing across your Temporal applications.

## Overview

There are **two different approaches** for integrating OpenTelemetry with the Temporal Python SDK:

1. **🆕 New Approach (Recommended)**: `OpenTelemetryPlugin` - Provides accurate duration spans and direct OpenTelemetry usage within workflows
2. **📊 Legacy Approach**: `TracingInterceptor` - Provides immediate span visibility but with zero-duration workflow spans

## Quick Start

### New Approach (OpenTelemetryPlugin)

```python
import opentelemetry.trace
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider

# Create a replay-safe tracer provider
provider = create_tracer_provider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
opentelemetry.trace.set_tracer_provider(provider)

# Register plugin on CLIENT (automatically applies to workers using this client)
client = await Client.connect(
"localhost:7233",
plugins=[OpenTelemetryPlugin()]
)

# Workers created with this client automatically get the plugin
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
# NO NEED to specify plugins here - they come from the client
)
```

### Legacy Approach (TracingInterceptor)

```python
from temporalio.contrib.opentelemetry import TracingInterceptor

# Register interceptor on CLIENT (automatically applies to workers using this client)
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()]
)

# Workers created with this client automatically get the interceptor
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
# NO NEED to specify interceptors here - they come from the client
)
```

## Detailed Comparison

### New Approach: OpenTelemetryPlugin

#### ✅ Advantages:
- **Accurate Duration Spans**: Workflow spans have real durations reflecting actual execution time
- **Direct OpenTelemetry Usage**: Use `opentelemetry.trace.get_tracer()` directly within workflows
- **Better Span Hierarchy**: More accurate parent-child relationships within workflows
- **Workflow Context Access**: Access spans within workflows using `temporalio.contrib.opentelemetry.workflow.tracer()`

#### ⚠️ Considerations:
- **Experimental Status**: Subject to breaking changes in future versions
- **Delayed Span Visibility**: Workflow spans only appear after workflow completion
- **Different Trace Structure**: Migration from legacy approach may break dependencies on specific trace structures

#### Usage Example:
```python
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self):
# Direct OpenTelemetry usage works correctly
tracer = get_tracer(__name__)
with tracer.start_as_current_span("workflow-operation"):
# This span will have accurate duration
await workflow.execute_activity(
my_activity,
start_to_close_timeout=timedelta(seconds=30)
)
```

### Legacy Approach: TracingInterceptor

**File**: `temporalio/contrib/opentelemetry/_interceptor.py`

#### ✅ Advantages:
- **Immediate Span Visibility**: Spans appear as soon as they're created
- **Stable API**: Well-established interface, not subject to experimental changes
- **Workflow Progress Tracking**: Can see workflow spans even before workflow completes

#### ⚠️ Limitations:
- **Zero-Duration Workflow Spans**: All workflow spans are immediately ended with 0ms duration
- **No Direct OpenTelemetry Usage**: Cannot use standard OpenTelemetry APIs within workflows
- **Limited Workflow Span Creation**: Must use `temporalio.contrib.opentelemetry.workflow.completed_span()`

#### Usage Example:
```python
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self):
# Must use Temporal-specific span creation
temporalio.contrib.opentelemetry.workflow.completed_span(
"workflow-operation",
attributes={"custom": "attribute"}
)
# Standard OpenTelemetry APIs don't work properly here
```

## When to Use Each Approach

### Choose OpenTelemetryPlugin When:
- You need accurate span durations for performance analysis
- You want to use standard OpenTelemetry APIs within workflows
- You're building new applications
- You can tolerate experimental API changes

### Choose TracingInterceptor When:
- You need immediate visibility into workflow progress
- You have existing dependencies on the current trace structure
- You require a stable, non-experimental API
- You primarily need basic tracing without complex workflow span hierarchies

## Configuration Options

### OpenTelemetryPlugin Options

```python
plugin = OpenTelemetryPlugin(
add_temporal_spans=False # Whether to add additional Temporal-specific spans
)
```

### TracingInterceptor Options

```python
interceptor = TracingInterceptor(
tracer=None, # Custom tracer (defaults to global tracer)
always_create_workflow_spans=False # Create spans even without parent context
)
```

## Migration Guide

### From TracingInterceptor to OpenTelemetryPlugin

1. **Replace interceptor with plugin on client**:
```python
# Old
client = await Client.connect(
"localhost:7233",
interceptors=[TracingInterceptor()]
)

# New
provider = create_tracer_provider()
opentelemetry.trace.set_tracer_provider(provider)
client = await Client.connect(
"localhost:7233",
plugins=[OpenTelemetryPlugin()]
)
```

2. **Update workflow span creation**:
```python
# Old
temporalio.contrib.opentelemetry.workflow.completed_span("my-span")

# New - use standard OpenTelemetry
tracer = get_tracer(__name__)
with tracer.start_as_current_span("my-span"):
# Your workflow logic
pass
```

3. **Test trace structure changes**: Verify that any monitoring or analysis tools still work with the new trace structure.

## Advanced Usage

### Creating Custom Spans in Workflows (New Approach)

```python
from opentelemetry.trace import get_tracer

@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self):
tracer = get_tracer(__name__)

# Create spans with accurate durations
with tracer.start_as_current_span("business-logic") as span:
span.set_attribute("workflow.step", "processing")

# Nested spans work correctly
with tracer.start_as_current_span("data-validation"):
await self.validate_input()

await workflow.execute_activity(
process_data,
start_to_close_timeout=timedelta(seconds=60)
)
```

### Custom Span Attributes

Both approaches support adding custom attributes to spans:

```python
# Legacy approach
temporalio.contrib.opentelemetry.workflow.completed_span(
"my-operation",
attributes={
"business.unit": "payments",
"request.id": "req-123"
}
)

# New approach
with tracer.start_as_current_span("my-operation") as span:
span.set_attributes({
"business.unit": "payments",
"request.id": "req-123"
})
```

## Best Practices

1. **Register on Client**: Always register plugins/interceptors on the client, not the worker, to ensure proper context propagation

2. **Use create_tracer_provider()**: Always use the provided function to create replay-safe tracer providers when using the new approach

3. **Set Global Tracer Provider**: Ensure the tracer provider is set globally before creating clients

4. **Avoid Duplication**: Never register the same plugin/interceptor on both client and worker

## Troubleshooting

### Common Issues

1. **"ReplaySafeTracerProvider required" error**: Make sure you're using `create_tracer_provider()` when using OpenTelemetryPlugin

2. **Missing spans**: Verify that the tracer provider is set before creating clients, and that plugins/interceptors are registered on the client

3. **Duplicate spans**: Check that you haven't registered the same plugin/interceptor on both client and worker

4. **Zero-duration spans**: This is expected behavior with TracingInterceptor for workflow spans
22 changes: 22 additions & 0 deletions temporalio/contrib/opentelemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""OpenTelemetry v2 integration for Temporal SDK.

This package provides OpenTelemetry tracing integration for Temporal workflows,
activities, and other operations. It includes automatic span creation and
propagation for distributed tracing.
"""

from temporalio.contrib.opentelemetry._interceptor import (
TracingInterceptor,
TracingWorkflowInboundInterceptor,
)
from temporalio.contrib.opentelemetry._otel_interceptor import OpenTelemetryInterceptor
from temporalio.contrib.opentelemetry._plugin import OpenTelemetryPlugin
from temporalio.contrib.opentelemetry._tracer_provider import create_tracer_provider

__all__ = [
"TracingInterceptor",
"TracingWorkflowInboundInterceptor",
"OpenTelemetryInterceptor",
"OpenTelemetryPlugin",
"create_tracer_provider",
]
72 changes: 72 additions & 0 deletions temporalio/contrib/opentelemetry/_id_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import random

from opentelemetry.sdk.trace.id_generator import IdGenerator
from opentelemetry.trace import (
INVALID_SPAN_ID,
INVALID_TRACE_ID,
)

import temporalio.workflow


def _get_workflow_random() -> random.Random | None:
if (
temporalio.workflow.in_workflow()
and not temporalio.workflow.unsafe.is_read_only()
):
if (
getattr(temporalio.workflow.instance(), "__temporal_otel_id_random", None)
is None
):
setattr(
temporalio.workflow.instance(),
"__temporal_otel_id_random",
temporalio.workflow.new_random(),
)
return getattr(temporalio.workflow.instance(), "__temporal_otel_id_random")

return None


class TemporalIdGenerator(IdGenerator):
"""OpenTelemetry ID generator that uses Temporal's deterministic random generator.

.. warning::
This class is experimental and may change in future versions.
Use with caution in production environments.

This generator uses Temporal's workflow-safe random number generator when
inside a workflow execution, ensuring deterministic span and trace IDs
across workflow replays. Falls back to standard random generation outside
of workflows.
"""

def __init__(self, id_generator: IdGenerator):
"""Initialize a TemporalIdGenerator."""
self._id_generator = id_generator

def generate_span_id(self) -> int:
"""Generate a span ID using Temporal's deterministic random when in workflow.

Returns:
A 64-bit span ID.
"""
if workflow_random := _get_workflow_random():
span_id = workflow_random.getrandbits(64)
while span_id == INVALID_SPAN_ID:
span_id = workflow_random.getrandbits(64)
return span_id
return self._id_generator.generate_span_id()

def generate_trace_id(self) -> int:
"""Generate a trace ID using Temporal's deterministic random when in workflow.

Returns:
A 128-bit trace ID.
"""
if workflow_random := _get_workflow_random():
trace_id = workflow_random.getrandbits(128)
while trace_id == INVALID_TRACE_ID:
trace_id = workflow_random.getrandbits(128)
return trace_id
return self._id_generator.generate_trace_id()
Original file line number Diff line number Diff line change
Expand Up @@ -827,43 +827,3 @@ def _carrier_to_nexus_headers(
else:
out[k] = v
return out


class workflow:
"""Contains static methods that are safe to call from within a workflow.

.. warning::
Using any other ``opentelemetry`` API could cause non-determinism.
"""

def __init__(self) -> None: # noqa: D107
raise NotImplementedError

@staticmethod
def completed_span(
name: str,
*,
attributes: opentelemetry.util.types.Attributes = None,
exception: Exception | None = None,
) -> None:
"""Create and end an OpenTelemetry span.

Note, this will only create and record when the workflow is not
replaying and if there is a current span (meaning the client started a
span and this interceptor is configured on the worker and the span is on
the context).

There is currently no way to create a long-running span or to create a
span that actually spans other code.

Args:
name: Name of the span.
attributes: Attributes to set on the span if any. Workflow ID and
run ID are automatically added.
exception: Optional exception to record on the span.
"""
interceptor = TracingWorkflowInboundInterceptor._from_context()
if interceptor:
interceptor._completed_span(
name, additional_attributes=attributes, exception=exception
)
Loading