-
Notifications
You must be signed in to change notification settings - Fork 602
feat: Add experimental async transport (port of PR #4572) #5646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4f8a00c
82c0094
4b77519
c46fb6f
5ea3aac
ff85a58
156f32b
cb932d2
d19271e
299947d
71007ec
183e83b
8883b78
86d6e36
e74f4a7
91072bb
d64517f
94b6c73
38f97c2
1ac4196
b990610
b392bc4
36ad606
025714e
ff3e9a0
9b0a712
f59c38c
94666e1
a2a9588
a8823a9
ed040a1
ad2ceae
4a2a6a3
c88848e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,7 +31,11 @@ | |
| from sentry_sdk.serializer import serialize | ||
| from sentry_sdk.tracing import trace | ||
| from sentry_sdk.tracing_utils import has_span_streaming_enabled | ||
| from sentry_sdk.transport import BaseHttpTransport, make_transport | ||
| from sentry_sdk.transport import ( | ||
| HttpTransportCore, | ||
| make_transport, | ||
| AsyncHttpTransport, | ||
| ) | ||
| from sentry_sdk.consts import ( | ||
| SPANDATA, | ||
| DEFAULT_MAX_VALUE_LENGTH, | ||
|
|
@@ -251,6 +255,12 @@ | |
| def flush(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| async def close_async(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| async def flush_async(self, *args: "Any", **kwargs: "Any") -> None: | ||
| return None | ||
|
|
||
| def __enter__(self) -> "BaseClient": | ||
| return self | ||
|
|
||
|
|
@@ -472,7 +482,7 @@ | |
| or self.metrics_batcher | ||
| or self.span_batcher | ||
| or has_profiling_enabled(self.options) | ||
| or isinstance(self.transport, BaseHttpTransport) | ||
| or isinstance(self.transport, HttpTransportCore) | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uwsgi thread check incorrectly triggered for async transportLow Severity The guard condition was changed from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. uWSGI thread check falsely triggers for async transportLow Severity The |
||
| ): | ||
| # If we have anything on that could spawn a background thread, we | ||
| # need to check if it's safe to use them. | ||
|
|
@@ -999,6 +1009,32 @@ | |
|
|
||
| return self.integrations.get(integration_name) | ||
|
|
||
| def _has_async_transport(self) -> bool: | ||
| """Check if the current transport is async.""" | ||
| return isinstance(self.transport, AsyncHttpTransport) | ||
|
|
||
| @property | ||
| def _batchers(self) -> "tuple[Any, ...]": | ||
| return tuple( | ||
| b | ||
| for b in (self.log_batcher, self.metrics_batcher, self.span_batcher) | ||
| if b is not None | ||
| ) | ||
|
|
||
| def _close_components(self) -> None: | ||
| """Kill all client components in the correct order.""" | ||
| self.session_flusher.kill() | ||
| for b in self._batchers: | ||
| b.kill() | ||
| if self.monitor: | ||
| self.monitor.kill() | ||
|
|
||
| def _flush_components(self) -> None: | ||
| """Flush all client components.""" | ||
| self.session_flusher.flush() | ||
| for b in self._batchers: | ||
| b.flush() | ||
|
|
||
| def close( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
|
|
@@ -1009,19 +1045,40 @@ | |
| semantics as :py:meth:`Client.flush`. | ||
| """ | ||
| if self.transport is not None: | ||
| self.flush(timeout=timeout, callback=callback) | ||
| self.session_flusher.kill() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.kill() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.kill() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.kill() | ||
| if self.monitor: | ||
| self.monitor.kill() | ||
| if self._has_async_transport(): | ||
| warnings.warn( | ||
| "close() used with AsyncHttpTransport. Use close_async() instead.", | ||
| stacklevel=2, | ||
| ) | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._flush_components() | ||
| else: | ||
| self.flush(timeout=timeout, callback=callback) | ||
| self._close_components() | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+1048
to
+1056
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Calling the synchronous Suggested FixThe synchronous Prompt for AI Agent |
||
| self.transport.kill() | ||
|
Check warning on line 1057 in sentry_sdk/client.py
|
||
| self.transport = None | ||
|
|
||
| async def close_async( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
| callback: "Optional[Callable[[int, float], None]]" = None, | ||
| ) -> None: | ||
| """ | ||
| Asynchronously close the client and shut down the transport. Arguments have the same | ||
| semantics as :py:meth:`Client.flush_async`. | ||
| """ | ||
| if self.transport is not None: | ||
| if not self._has_async_transport(): | ||
| logger.debug( | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "close_async() used with non-async transport, aborting. Please use close() instead." | ||
| ) | ||
| return | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await self.flush_async(timeout=timeout, callback=callback) | ||
| self._close_components() | ||
github-actions[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| kill_task = self.transport.kill() # type: ignore | ||
| if kill_task is not None: | ||
BYK marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await kill_task | ||
| self.transport = None | ||
|
|
||
| def flush( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
|
|
@@ -1035,23 +1092,55 @@ | |
| :param callback: Is invoked with the number of pending events and the configured timeout. | ||
| """ | ||
| if self.transport is not None: | ||
| if self._has_async_transport(): | ||
| warnings.warn( | ||
| "flush() used with AsyncHttpTransport. Use flush_async() instead.", | ||
| stacklevel=2, | ||
| ) | ||
| return | ||
| if timeout is None: | ||
| timeout = self.options["shutdown_timeout"] | ||
| self.session_flusher.flush() | ||
| if self.log_batcher is not None: | ||
| self.log_batcher.flush() | ||
| if self.metrics_batcher is not None: | ||
| self.metrics_batcher.flush() | ||
| if self.span_batcher is not None: | ||
| self.span_batcher.flush() | ||
| self._flush_components() | ||
|
|
||
| self.transport.flush(timeout=timeout, callback=callback) | ||
|
|
||
| async def flush_async( | ||
| self, | ||
| timeout: "Optional[float]" = None, | ||
| callback: "Optional[Callable[[int, float], None]]" = None, | ||
| ) -> None: | ||
| """ | ||
| Asynchronously wait for the current events to be sent. | ||
|
|
||
| :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. | ||
|
|
||
| :param callback: Is invoked with the number of pending events and the configured timeout. | ||
| """ | ||
| if self.transport is not None: | ||
| if not self._has_async_transport(): | ||
| logger.debug( | ||
| "flush_async() used with non-async transport, aborting. Please use flush() instead." | ||
| ) | ||
| return | ||
| if timeout is None: | ||
| timeout = self.options["shutdown_timeout"] | ||
| self._flush_components() | ||
| flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore | ||
| if flush_task is not None: | ||
| await flush_task | ||
|
|
||
| def __enter__(self) -> "_Client": | ||
| return self | ||
|
|
||
| def __exit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None: | ||
| self.close() | ||
|
|
||
| async def __aenter__(self) -> "_Client": | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type: "Any", exc_value: "Any", tb: "Any") -> None: | ||
| await self.close_async() | ||
sentry-warden[bot] marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Async context manager silently skips cleanup for sync transportMedium Severity
Additional Locations (1) |
||
|
|
||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.