Complete API documentation for the TelemetryFlow Python SDK.
The main SDK client for recording telemetry data.
from telemetryflow import TelemetryFlowClient
from telemetryflow.domain.config import TelemetryConfigTelemetryFlowClient(config: TelemetryConfig)| Parameter | Type | Description |
|---|---|---|
config |
TelemetryConfig |
SDK configuration |
Initialize the SDK and connect to the TelemetryFlow backend.
def initialize(self) -> NoneRaises:
TelemetryFlowError: If initialization fails
Example:
client = TelemetryFlowBuilder().with_auto_configuration().build()
client.initialize()Shut down the SDK gracefully, flushing pending data.
def shutdown(self, timeout: float = 30.0) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
timeout |
float |
30.0 |
Maximum wait time in seconds |
Example:
client.shutdown(timeout=60.0)Force flush all pending telemetry data.
def flush(self) -> NoneRaises:
NotInitializedError: If client is not initialized
Check if the client is initialized.
def is_initialized(self) -> boolReturns: True if initialized, False otherwise
Property to get the SDK configuration.
@property
def config(self) -> TelemetryConfigIncrement a counter metric.
def increment_counter(
self,
name: str,
value: int = 1,
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Counter name |
value |
int |
1 |
Increment value |
attributes |
dict |
None |
Metric attributes |
Example:
client.increment_counter("http.requests.total", attributes={"method": "GET"})
client.increment_counter("cache.hits", value=5)Record a gauge metric value.
def record_gauge(
self,
name: str,
value: float,
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Gauge name |
value |
float |
Required | Current value |
attributes |
dict |
None |
Metric attributes |
Example:
client.record_gauge("connections.active", 42)
client.record_gauge("memory.usage_bytes", 1024*1024*512)Record a histogram metric value.
def record_histogram(
self,
name: str,
value: float,
unit: str = "",
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Histogram name |
value |
float |
Required | Value to record |
unit |
str |
"" |
Unit of measurement |
attributes |
dict |
None |
Metric attributes |
Example:
client.record_histogram("http.request.duration", 0.125, unit="s")
client.record_histogram("db.query.rows", 1000)Record a generic metric value.
def record_metric(
self,
name: str,
value: float,
unit: str = "",
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Metric name |
value |
float |
Required | Metric value |
unit |
str |
"" |
Unit of measurement |
attributes |
dict |
None |
Metric attributes |
Emit a log entry with custom severity.
def log(
self,
message: str,
severity: SeverityLevel = SeverityLevel.INFO,
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
message |
str |
Required | Log message |
severity |
SeverityLevel |
INFO |
Log severity |
attributes |
dict |
None |
Log attributes |
Example:
from telemetryflow.application.commands import SeverityLevel
client.log("Custom message", SeverityLevel.WARN, {"key": "value"})Emit a debug-level log.
def log_debug(self, message: str, attributes: dict[str, Any] | None = None) -> NoneEmit an info-level log.
def log_info(self, message: str, attributes: dict[str, Any] | None = None) -> NoneExample:
client.log_info("User logged in", {"user_id": "123"})Emit a warning-level log.
def log_warn(self, message: str, attributes: dict[str, Any] | None = None) -> NoneEmit an error-level log.
def log_error(self, message: str, attributes: dict[str, Any] | None = None) -> NoneExample:
client.log_error("Database connection failed", {"database": "users_db"})Start a new trace span.
def start_span(
self,
name: str,
kind: SpanKind = SpanKind.INTERNAL,
attributes: dict[str, Any] | None = None,
) -> str| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Span name |
kind |
SpanKind |
INTERNAL |
Span kind |
attributes |
dict |
None |
Span attributes |
Returns: Span ID string
Example:
from telemetryflow.application.commands import SpanKind
span_id = client.start_span("http.request", SpanKind.SERVER)End a trace span.
def end_span(self, span_id: str, error: Exception | None = None) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
span_id |
str |
Required | Span ID from start_span |
error |
Exception |
None |
Exception if span failed |
Example:
try:
# work...
except Exception as e:
client.end_span(span_id, error=e)
raise
else:
client.end_span(span_id)Add an event to an active span.
def add_span_event(
self,
span_id: str,
name: str,
attributes: dict[str, Any] | None = None,
) -> None| Parameter | Type | Default | Description |
|---|---|---|---|
span_id |
str |
Required | Span ID |
name |
str |
Required | Event name |
attributes |
dict |
None |
Event attributes |
Example:
client.add_span_event(span_id, "query_executed", {"rows": 100})Context manager for span lifecycle.
@contextmanager
def span(
self,
name: str,
kind: SpanKind = SpanKind.INTERNAL,
attributes: dict[str, Any] | None = None,
) -> Generator[str, None, None]Yields: Span ID string
Example:
with client.span("process_request", SpanKind.SERVER) as span_id:
client.add_span_event(span_id, "started")
# work...
client.add_span_event(span_id, "completed")Get the current SDK status.
def get_status(self) -> dict[str, Any]Returns: Dictionary with status information:
initialized: boolversion: strservice_name: strendpoint: strprotocol: strsignals_enabled: list[str]uptime_seconds: float | Nonemetrics_sent: intlogs_sent: intspans_sent: int
The client can be used as a context manager:
with TelemetryFlowBuilder().with_auto_configuration().build() as client:
client.increment_counter("app.started")
# Client automatically initialized and shut downFluent builder for creating TelemetryFlow clients.
from telemetryflow import TelemetryFlowBuilderTelemetryFlowBuilder()Set API key credentials.
def with_api_key(self, key_id: str, key_secret: str) -> TelemetryFlowBuilder| Parameter | Type | Description |
|---|---|---|
key_id |
str |
API key ID (must start with 'tfk_') |
key_secret |
str |
API key secret (must start with 'tfs_') |
Load API key from environment variables.
def with_api_key_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_API_KEY_ID, TELEMETRYFLOW_API_KEY_SECRET
Set the collector endpoint.
def with_endpoint(self, endpoint: str) -> TelemetryFlowBuilder| Parameter | Type | Description |
|---|---|---|
endpoint |
str |
Endpoint address (host:port) |
Load endpoint from environment variable.
def with_endpoint_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_ENDPOINT (default: api.telemetryflow.id:4317)
Set service name and optional version.
def with_service(self, name: str, version: str | None = None) -> TelemetryFlowBuilder| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
Required | Service name |
version |
str |
None |
Service version |
Load service configuration from environment.
def with_service_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_SERVICE_NAME, TELEMETRYFLOW_SERVICE_VERSION
Set service namespace.
def with_service_namespace(self, namespace: str) -> TelemetryFlowBuilderLoad service namespace from environment.
def with_service_namespace_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_SERVICE_NAMESPACE
Set deployment environment.
def with_environment(self, environment: str) -> TelemetryFlowBuilderLoad environment from environment variable.
def with_environment_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_ENVIRONMENT, ENV, ENVIRONMENT (in order)
Set OTLP protocol.
def with_protocol(self, protocol: Protocol) -> TelemetryFlowBuilderUse gRPC protocol (default).
def with_grpc(self) -> TelemetryFlowBuilderUse HTTP protocol.
def with_http(self) -> TelemetryFlowBuilderEnable/disable TLS verification.
def with_insecure(self, insecure: bool = True) -> TelemetryFlowBuilderConfigure which signals to enable.
def with_signals(
self,
metrics: bool = True,
logs: bool = True,
traces: bool = True,
) -> TelemetryFlowBuilderEnable only metrics.
def with_metrics_only(self) -> TelemetryFlowBuilderEnable only logs.
def with_logs_only(self) -> TelemetryFlowBuilderEnable only traces.
def with_traces_only(self) -> TelemetryFlowBuilderEnable/disable exemplars.
def with_exemplars(self, enabled: bool = True) -> TelemetryFlowBuilderSet connection timeout.
def with_timeout(self, timeout: timedelta) -> TelemetryFlowBuilderSet collector ID.
def with_collector_id(self, collector_id: str) -> TelemetryFlowBuilderLoad collector ID from environment.
def with_collector_id_from_env(self) -> TelemetryFlowBuilderUses: TELEMETRYFLOW_COLLECTOR_ID
Add custom resource attribute.
def with_custom_attribute(self, key: str, value: str) -> TelemetryFlowBuilderAdd multiple custom attributes.
def with_custom_attributes(self, attributes: dict[str, str]) -> TelemetryFlowBuilderEnable/disable compression.
def with_compression(self, enabled: bool = True) -> TelemetryFlowBuilderConfigure retry settings.
def with_retry(
self,
enabled: bool = True,
max_retries: int = 3,
backoff: timedelta | None = None,
) -> TelemetryFlowBuilderConfigure batch export settings.
def with_batch_settings(
self,
timeout: timedelta | None = None,
max_size: int | None = None,
) -> TelemetryFlowBuilderSet rate limit.
def with_rate_limit(self, rate_limit: int) -> TelemetryFlowBuilderLoad all configuration from environment variables.
def with_auto_configuration(self) -> TelemetryFlowBuilderEquivalent to calling all *_from_env() methods.
Build the TelemetryFlow client.
def build(self) -> TelemetryFlowClientRaises:
BuilderError: If configuration is invalid
Build client (alias for build()).
def must_build(self) -> TelemetryFlowClientImmutable value object for API credentials.
from telemetryflow.domain.credentials import CredentialsCredentials(key_id: str, key_secret: str)Raises:
CredentialsError: If validation fails
Factory method to create credentials.
@classmethod
def create(cls, key_id: str, key_secret: str) -> CredentialsGenerate Authorization header value.
def authorization_header(self) -> strReturns: "Bearer {key_id}:{key_secret}"
Generate all authentication headers.
def auth_headers(self) -> dict[str, str]Returns: Dictionary with Authorization, X-TelemetryFlow-Key-ID, X-TelemetryFlow-Key-Secret
Check equality with another Credentials.
def equals(self, other: Credentials | None) -> boolConfiguration aggregate root.
from telemetryflow.domain.config import TelemetryConfig, Protocol, SignalTypeTelemetryConfig(
credentials: Credentials,
endpoint: str,
service_name: str,
protocol: Protocol = Protocol.GRPC,
# ... many more options
)See ARCHITECTURE.md for full configuration options.
OTLP protocol enumeration.
class Protocol(str, Enum):
GRPC = "grpc"
HTTP = "http"Telemetry signal enumeration.
class SignalType(str, Enum):
METRICS = "metrics"
LOGS = "logs"
TRACES = "traces"Log severity levels.
from telemetryflow.application.commands import SeverityLevel
class SeverityLevel(str, Enum):
TRACE = "trace"
DEBUG = "debug"
INFO = "info"
WARN = "warn"
ERROR = "error"
FATAL = "fatal"Span kind for traces.
from telemetryflow.application.commands import SpanKind
class SpanKind(str, Enum):
INTERNAL = "internal"
SERVER = "server"
CLIENT = "client"
PRODUCER = "producer"
CONSUMER = "consumer"| Kind | Description |
|---|---|
INTERNAL |
Default, internal operations |
SERVER |
Server-side request handling |
CLIENT |
Client-side outgoing requests |
PRODUCER |
Message queue producer |
CONSUMER |
Message queue consumer |
Flask middleware for automatic request instrumentation.
from telemetryflow.middleware import FlaskTelemetryMiddleware
middleware = FlaskTelemetryMiddleware(
client,
app=None, # Optional, can call init_app later
record_request_duration=True,
record_request_count=True,
record_error_count=True,
excluded_paths=["/health", "/metrics"],
)
middleware.init_app(app)FastAPI/Starlette middleware for automatic request instrumentation.
from telemetryflow.middleware import FastAPITelemetryMiddleware
app.add_middleware(
FastAPITelemetryMiddleware,
client=client,
excluded_paths=["/health"],
)Base exception for SDK errors.
from telemetryflow.client import TelemetryFlowErrorRaised when using uninitialized client.
from telemetryflow.client import NotInitializedErrorRaised for configuration validation errors.
from telemetryflow.domain.config import ConfigErrorRaised for credential validation errors.
from telemetryflow.domain.credentials import CredentialsErrorRaised for builder configuration errors.
from telemetryflow.builder import BuilderError| Variable | Required | Default | Description |
|---|---|---|---|
TELEMETRYFLOW_API_KEY_ID |
Yes | - | API key ID (tfk_*) |
TELEMETRYFLOW_API_KEY_SECRET |
Yes | - | API key secret (tfs_*) |
TELEMETRYFLOW_ENDPOINT |
No | api.telemetryflow.id:4317 | Collector endpoint |
TELEMETRYFLOW_SERVICE_NAME |
Yes | - | Service name |
TELEMETRYFLOW_SERVICE_VERSION |
No | 1.0.0 | Service version |
TELEMETRYFLOW_SERVICE_NAMESPACE |
No | telemetryflow | Service namespace |
TELEMETRYFLOW_ENVIRONMENT |
No | production | Environment |
TELEMETRYFLOW_COLLECTOR_ID |
No | - | Collector ID |
ENV |
No | - | Fallback for environment |
ENVIRONMENT |
No | - | Fallback for environment |