Skip to content

Commit 2604409

Browse files
BYKclaudesentrivana
authored
feat: Add experimental async transport (port of PR #4572) (#5646)
Add an experimental async transport using httpcore's async backend, enabled via `_experiments={"transport_async": True}`. This is a manual port of PR #4572 (originally merged into `potel-base`) onto the current `master` branch. ## Key changes - **transport.py**: Refactor `BaseHttpTransport` into `HttpTransportCore` (shared base) + `BaseHttpTransport` (sync) + `AsyncHttpTransport` (async, conditional on `httpcore[asyncio]`). Extract shared helpers: `_handle_request_error`, `_handle_response`, `_update_headers`, `_prepare_envelope`. Update `make_transport()` to detect the `transport_async` experiment. - **worker.py**: Add `Worker` ABC base class and `AsyncWorker` implementation using `asyncio.Queue` / `asyncio.Task`. - **client.py**: Add `close_async()` / `flush_async()` with async-vs-sync transport detection. Extract `_close_components()` / `_flush_components()`. - **api.py**: Expose `flush_async()` as a public API. - **integrations/asyncio.py**: Patch `loop.close` to flush pending events before shutdown. Skip span wrapping for internal Sentry tasks. - **utils.py**: Add `is_internal_task()` / `mark_sentry_task_internal()` via ContextVar for async task filtering. - **setup.py**: Add `"asyncio"` extras_require (`httpcore[asyncio]==1.*`). - **config.py / tox.ini**: Widen anyio to `>=3,<5` for httpx and FastAPI. ## Notes - `tox.ini` was manually edited (the generation script requires a free-threaded Python interpreter). A full regeneration should be done before merge. Refs: GH-4568 --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Ivana Kellyer <ivana.kellyer@sentry.io>
1 parent 49a5978 commit 2604409

File tree

16 files changed

+1694
-445
lines changed

16 files changed

+1694
-445
lines changed

scripts/populate_tox/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
"pytest-asyncio",
123123
"python-multipart",
124124
"requests",
125-
"anyio<4",
125+
"anyio>=3,<5",
126126
"jinja2",
127127
],
128128
# There's an incompatibility between FastAPI's TestClient, which is
@@ -133,6 +133,7 @@
133133
# FastAPI versions we use older httpx which still supports the
134134
# deprecated argument.
135135
"<0.110.1": ["httpx<0.28.0"],
136+
"<0.80": ["anyio<4"],
136137
"py3.6": ["aiocontextvars"],
137138
},
138139
},
@@ -171,7 +172,8 @@
171172
"httpx": {
172173
"package": "httpx",
173174
"deps": {
174-
"*": ["anyio<4.0.0"],
175+
"*": ["anyio>=3,<5"],
176+
"<0.24": ["anyio<4"],
175177
">=0.16,<0.17": ["pytest-httpx==0.10.0"],
176178
">=0.17,<0.19": ["pytest-httpx==0.12.0"],
177179
">=0.19,<0.21": ["pytest-httpx==0.14.0"],

scripts/populate_tox/releases.jsonl

Lines changed: 3 additions & 3 deletions
Large diffs are not rendered by default.

scripts/populate_tox/tox.jinja

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,17 @@ deps =
7878
7979
linters: -r requirements-linting.txt
8080
linters: werkzeug<2.3.0
81+
linters: httpcore[asyncio]
8182
8283
mypy: -r requirements-linting.txt
8384
mypy: werkzeug<2.3.0
85+
mypy: httpcore[asyncio]
8486
ruff: -r requirements-linting.txt
8587
8688
# === Common ===
8789
py3.8-common: hypothesis
8890
common: pytest-asyncio
91+
common: httpcore[asyncio]
8992
# See https://github.com/pytest-dev/pytest/issues/9621
9093
# and https://github.com/pytest-dev/pytest-forked/issues/67
9194
# for justification of the upper bound on pytest

sentry_sdk/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"configure_scope",
2626
"continue_trace",
2727
"flush",
28+
"flush_async",
2829
"get_baggage",
2930
"get_client",
3031
"get_global_scope",

sentry_sdk/api.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def overload(x: "T") -> "T":
6060
"configure_scope",
6161
"continue_trace",
6262
"flush",
63+
"flush_async",
6364
"get_baggage",
6465
"get_client",
6566
"get_global_scope",
@@ -351,6 +352,14 @@ def flush(
351352
return get_client().flush(timeout=timeout, callback=callback)
352353

353354

355+
@clientmethod
356+
async def flush_async(
357+
timeout: "Optional[float]" = None,
358+
callback: "Optional[Callable[[int, float], None]]" = None,
359+
) -> None:
360+
return await get_client().flush_async(timeout=timeout, callback=callback)
361+
362+
354363
@scopemethod
355364
def start_span(
356365
**kwargs: "Any",

sentry_sdk/client.py

Lines changed: 108 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,11 @@
3131
from sentry_sdk.serializer import serialize
3232
from sentry_sdk.tracing import trace
3333
from sentry_sdk.tracing_utils import has_span_streaming_enabled
34-
from sentry_sdk.transport import BaseHttpTransport, make_transport
34+
from sentry_sdk.transport import (
35+
HttpTransportCore,
36+
make_transport,
37+
AsyncHttpTransport,
38+
)
3539
from sentry_sdk.consts import (
3640
SPANDATA,
3741
DEFAULT_MAX_VALUE_LENGTH,
@@ -251,6 +255,12 @@ def close(self, *args: "Any", **kwargs: "Any") -> None:
251255
def flush(self, *args: "Any", **kwargs: "Any") -> None:
252256
return None
253257

258+
async def close_async(self, *args: "Any", **kwargs: "Any") -> None:
259+
return None
260+
261+
async def flush_async(self, *args: "Any", **kwargs: "Any") -> None:
262+
return None
263+
254264
def __enter__(self) -> "BaseClient":
255265
return self
256266

@@ -472,7 +482,7 @@ def _record_lost_event(
472482
or self.metrics_batcher
473483
or self.span_batcher
474484
or has_profiling_enabled(self.options)
475-
or isinstance(self.transport, BaseHttpTransport)
485+
or isinstance(self.transport, HttpTransportCore)
476486
):
477487
# If we have anything on that could spawn a background thread, we
478488
# need to check if it's safe to use them.
@@ -999,6 +1009,32 @@ def get_integration(
9991009

10001010
return self.integrations.get(integration_name)
10011011

1012+
def _has_async_transport(self) -> bool:
1013+
"""Check if the current transport is async."""
1014+
return isinstance(self.transport, AsyncHttpTransport)
1015+
1016+
@property
1017+
def _batchers(self) -> "tuple[Any, ...]":
1018+
return tuple(
1019+
b
1020+
for b in (self.log_batcher, self.metrics_batcher, self.span_batcher)
1021+
if b is not None
1022+
)
1023+
1024+
def _close_components(self) -> None:
1025+
"""Kill all client components in the correct order."""
1026+
self.session_flusher.kill()
1027+
for b in self._batchers:
1028+
b.kill()
1029+
if self.monitor:
1030+
self.monitor.kill()
1031+
1032+
def _flush_components(self) -> None:
1033+
"""Flush all client components."""
1034+
self.session_flusher.flush()
1035+
for b in self._batchers:
1036+
b.flush()
1037+
10021038
def close(
10031039
self,
10041040
timeout: "Optional[float]" = None,
@@ -1009,19 +1045,40 @@ def close(
10091045
semantics as :py:meth:`Client.flush`.
10101046
"""
10111047
if self.transport is not None:
1012-
self.flush(timeout=timeout, callback=callback)
1013-
self.session_flusher.kill()
1014-
if self.log_batcher is not None:
1015-
self.log_batcher.kill()
1016-
if self.metrics_batcher is not None:
1017-
self.metrics_batcher.kill()
1018-
if self.span_batcher is not None:
1019-
self.span_batcher.kill()
1020-
if self.monitor:
1021-
self.monitor.kill()
1048+
if self._has_async_transport():
1049+
warnings.warn(
1050+
"close() used with AsyncHttpTransport. Use close_async() instead.",
1051+
stacklevel=2,
1052+
)
1053+
self._flush_components()
1054+
else:
1055+
self.flush(timeout=timeout, callback=callback)
1056+
self._close_components()
10221057
self.transport.kill()
10231058
self.transport = None
10241059

1060+
async def close_async(
1061+
self,
1062+
timeout: "Optional[float]" = None,
1063+
callback: "Optional[Callable[[int, float], None]]" = None,
1064+
) -> None:
1065+
"""
1066+
Asynchronously close the client and shut down the transport. Arguments have the same
1067+
semantics as :py:meth:`Client.flush_async`.
1068+
"""
1069+
if self.transport is not None:
1070+
if not self._has_async_transport():
1071+
logger.debug(
1072+
"close_async() used with non-async transport, aborting. Please use close() instead."
1073+
)
1074+
return
1075+
await self.flush_async(timeout=timeout, callback=callback)
1076+
self._close_components()
1077+
kill_task = self.transport.kill() # type: ignore
1078+
if kill_task is not None:
1079+
await kill_task
1080+
self.transport = None
1081+
10251082
def flush(
10261083
self,
10271084
timeout: "Optional[float]" = None,
@@ -1035,23 +1092,55 @@ def flush(
10351092
:param callback: Is invoked with the number of pending events and the configured timeout.
10361093
"""
10371094
if self.transport is not None:
1095+
if self._has_async_transport():
1096+
warnings.warn(
1097+
"flush() used with AsyncHttpTransport. Use flush_async() instead.",
1098+
stacklevel=2,
1099+
)
1100+
return
10381101
if timeout is None:
10391102
timeout = self.options["shutdown_timeout"]
1040-
self.session_flusher.flush()
1041-
if self.log_batcher is not None:
1042-
self.log_batcher.flush()
1043-
if self.metrics_batcher is not None:
1044-
self.metrics_batcher.flush()
1045-
if self.span_batcher is not None:
1046-
self.span_batcher.flush()
1103+
self._flush_components()
1104+
10471105
self.transport.flush(timeout=timeout, callback=callback)
10481106

1107+
async def flush_async(
1108+
self,
1109+
timeout: "Optional[float]" = None,
1110+
callback: "Optional[Callable[[int, float], None]]" = None,
1111+
) -> None:
1112+
"""
1113+
Asynchronously wait for the current events to be sent.
1114+
1115+
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
1116+
1117+
:param callback: Is invoked with the number of pending events and the configured timeout.
1118+
"""
1119+
if self.transport is not None:
1120+
if not self._has_async_transport():
1121+
logger.debug(
1122+
"flush_async() used with non-async transport, aborting. Please use flush() instead."
1123+
)
1124+
return
1125+
if timeout is None:
1126+
timeout = self.options["shutdown_timeout"]
1127+
self._flush_components()
1128+
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
1129+
if flush_task is not None:
1130+
await flush_task
1131+
10491132
def __enter__(self) -> "_Client":
10501133
return self
10511134

10521135
def __exit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None:
10531136
self.close()
10541137

1138+
async def __aenter__(self) -> "_Client":
1139+
return self
1140+
1141+
async def __aexit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None:
1142+
await self.close_async()
1143+
10551144

10561145
from typing import TYPE_CHECKING
10571146

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class CompressionAlgo(Enum):
7878
"transport_compression_algo": Optional[CompressionAlgo],
7979
"transport_num_pools": Optional[int],
8080
"transport_http2": Optional[bool],
81+
"transport_async": Optional[bool],
8182
"enable_logs": Optional[bool],
8283
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
8384
"enable_metrics": Optional[bool],

0 commit comments

Comments
 (0)