Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/a2a/client/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
Message,
Expand Down Expand Up @@ -247,6 +250,45 @@ async def get_task_callback(
request, context=context, extensions=extensions
)

async def list_task_callback(
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task.

Args:
request: The `ListTaskPushNotificationConfigRequest` object specifying the request.
context: The client call context.
extensions: List of extensions to be activated.

Returns:
A `ListTaskPushNotificationConfigResponse` object.
"""
return await self._transport.list_task_callback(
request, context=context, extensions=extensions
)

async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task.

Args:
request: The `DeleteTaskPushNotificationConfigRequest` object specifying the request.
context: The client call context.
extensions: List of extensions to be activated.
"""
await self._transport.delete_task_callback(
request, context=context, extensions=extensions
)

async def subscribe(
self,
request: SubscribeToTaskRequest,
Expand Down
23 changes: 23 additions & 0 deletions src/a2a/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
Message,
Expand Down Expand Up @@ -141,48 +144,68 @@
self,
request: ListTasksRequest,
*,
context: ClientCallContext | None = None,
) -> ListTasksResponse:
"""Retrieves tasks for an agent."""

@abstractmethod
async def cancel_task(
self,
request: CancelTaskRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""

@abstractmethod
async def set_task_callback(
self,
request: CreateTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task."""

@abstractmethod
async def get_task_callback(
self,
request: GetTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task."""

@abstractmethod
async def list_task_callback(
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task."""

@abstractmethod
async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task."""

@abstractmethod
async def subscribe(
self,
request: SubscribeToTaskRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncIterator[ClientEvent]:

Check notice on line 208 in src/a2a/client/client.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/base.py (82-143)
"""Resubscribes to a task's event stream."""
return
yield
Expand Down
23 changes: 23 additions & 0 deletions src/a2a/client/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
SendMessageRequest,
Expand Down Expand Up @@ -76,48 +79,68 @@
request: ListTasksRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTasksResponse:
"""Retrieves tasks for an agent."""

@abstractmethod
async def cancel_task(
self,
request: CancelTaskRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""

@abstractmethod
async def set_task_callback(
self,
request: CreateTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task."""

@abstractmethod
async def get_task_callback(
self,
request: GetTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task."""

@abstractmethod
async def list_task_callback(
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task."""

@abstractmethod
async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task."""

@abstractmethod
async def subscribe(
self,
request: SubscribeToTaskRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncGenerator[StreamResponse]:

Check notice on line 143 in src/a2a/client/transports/base.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/client.py (147-208)
"""Reconnects to get task updates."""
return
yield
Expand Down
29 changes: 29 additions & 0 deletions src/a2a/client/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
SendMessageRequest,
SendMessageResponse,
StreamResponse,
SubscribeToTaskRequest,
Task,
TaskPushNotificationConfig,
)
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.CLIENT)
class GrpcTransport(ClientTransport):

Check notice on line 47 in src/a2a/client/transports/grpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (29-50)
"""A gRPC transport for the A2A client."""

def __init__(
Expand Down Expand Up @@ -198,6 +201,32 @@
metadata=self._get_grpc_metadata(extensions),
)

async def list_task_callback(
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task."""
return await self.stub.ListTaskPushNotificationConfig(
request,
metadata=self._get_grpc_metadata(extensions),
)

async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task."""
await self.stub.DeleteTaskPushNotificationConfig(
request,
metadata=self._get_grpc_metadata(extensions),
)

async def get_extended_agent_card(
self,
*,
Expand Down
66 changes: 66 additions & 0 deletions src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,29 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetExtendedAgentCardRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
SendMessageRequest,
SendMessageResponse,
StreamResponse,
SubscribeToTaskRequest,
Task,
TaskPushNotificationConfig,
)
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.CLIENT)
class JsonRpcTransport(ClientTransport):

Check notice on line 50 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/grpc.py (26-47)
"""A JSON-RPC transport for the A2A client."""

def __init__(
Expand Down Expand Up @@ -364,20 +367,83 @@
extensions if extensions is not None else self.extensions,
)
payload, modified_kwargs = await self._apply_interceptors(
'GetTaskPushNotificationConfig',
cast('dict[str, Any]', rpc_request.data),
modified_kwargs,
context,
)
response_data = await self._send_request(payload, modified_kwargs)
json_rpc_response = JSONRPC20Response(**response_data)
if json_rpc_response.error:
raise A2AClientJSONRPCError(json_rpc_response.error)
response: TaskPushNotificationConfig = json_format.ParseDict(
json_rpc_response.result, TaskPushNotificationConfig()
)
return response

async def list_task_callback(

Check notice on line 384 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (338-352)
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task."""
rpc_request = JSONRPC20Request(
method='ListTaskPushNotificationConfig',
params=json_format.MessageToDict(request),
_id=str(uuid4()),
)
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
payload, modified_kwargs = await self._apply_interceptors(
'ListTaskPushNotificationConfig',

Check notice on line 402 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (119-128)
cast('dict[str, Any]', rpc_request.data),
modified_kwargs,
context,
)
response_data = await self._send_request(payload, modified_kwargs)
json_rpc_response = JSONRPC20Response(**response_data)
if json_rpc_response.error:
raise A2AClientJSONRPCError(json_rpc_response.error)
response: ListTaskPushNotificationConfigResponse = (
json_format.ParseDict(
json_rpc_response.result,
ListTaskPushNotificationConfigResponse(),
)
)
return response

async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task."""
rpc_request = JSONRPC20Request(
method='DeleteTaskPushNotificationConfig',
params=json_format.MessageToDict(request),
_id=str(uuid4()),
)
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
payload, modified_kwargs = await self._apply_interceptors(
'DeleteTaskPushNotificationConfig',

Check notice on line 437 in src/a2a/client/transports/jsonrpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/jsonrpc.py (119-128)
cast('dict[str, Any]', rpc_request.data),
modified_kwargs,
context,
)
response_data = await self._send_request(payload, modified_kwargs)
json_rpc_response = JSONRPC20Response(**response_data)
if json_rpc_response.error:
raise A2AClientJSONRPCError(json_rpc_response.error)
Comment on lines +384 to +445
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These two new methods, list_task_callback and delete_task_callback, share a lot of boilerplate code for making a JSON-RPC request. This is a pattern repeated across other methods in this class as well. To improve maintainability and reduce code duplication, consider extracting this logic into a private helper method.

A helper method could handle creating the JSONRPC20Request, applying interceptors, sending the request, and handling the response, including error checking. The specific methods could then call this helper with the RPC method name and expected response type.

For example, you could have a helper like _make_rpc_call(self, method, request, response_cls, ...).


async def subscribe(
self,
request: SubscribeToTaskRequest,
Expand Down
76 changes: 76 additions & 0 deletions src/a2a/client/transports/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
AgentCard,
CancelTaskRequest,
CreateTaskPushNotificationConfigRequest,
DeleteTaskPushNotificationConfigRequest,
GetTaskPushNotificationConfigRequest,
GetTaskRequest,
ListTaskPushNotificationConfigRequest,
ListTaskPushNotificationConfigResponse,
ListTasksRequest,
ListTasksResponse,
SendMessageRequest,
Expand Down Expand Up @@ -224,6 +227,21 @@
)
)

async def _send_delete_request(
self,
target: str,
query_params: dict[str, Any],
http_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
return await self._send_request(
self.httpx_client.build_request(
'DELETE',
f'{self.url}{target}',
params=query_params,
**(http_kwargs or {}),
)
)

async def get_task(
self,
request: GetTaskRequest,
Expand Down Expand Up @@ -363,6 +381,64 @@
)
return response

async def list_task_callback(
self,
request: ListTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> ListTaskPushNotificationConfigResponse:
"""Lists push notification configurations for a specific task."""
params = MessageToDict(request)
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
params, modified_kwargs = await self._apply_interceptors(
params,
modified_kwargs,
context,
)
if 'task_id' in params:
del params['task_id']
response_data = await self._send_get_request(
f'/v1/tasks/{request.task_id}/pushNotificationConfigs',
params,
modified_kwargs,
)
response: ListTaskPushNotificationConfigResponse = ParseDict(
response_data, ListTaskPushNotificationConfigResponse()
)
return response

async def delete_task_callback(
self,
request: DeleteTaskPushNotificationConfigRequest,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> None:
"""Deletes the push notification configuration for a specific task."""
params = MessageToDict(request)
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
params, modified_kwargs = await self._apply_interceptors(
params,
modified_kwargs,
context,
)
if 'id' in params:
del params['id']
if 'task_id' in params:
del params['task_id']
await self._send_delete_request(

Check notice on line 436 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (360-374)
f'/v1/tasks/{request.task_id}/pushNotificationConfigs/{request.id}',
params,
modified_kwargs,
)

async def subscribe(
self,
request: SubscribeToTaskRequest,
Expand Down
6 changes: 6 additions & 0 deletions src/a2a/server/apps/rest/rest_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ def routes(self) -> dict[tuple[str, str], Callable[[Request], Any]]:
): functools.partial(
self._handle_request, self.handler.get_push_notification
),
(
'/v1/tasks/{id}/pushNotificationConfigs/{push_id}',
'DELETE',
): functools.partial(
self._handle_request, self.handler.delete_push_notification
),
(
'/v1/tasks/{id}/pushNotificationConfigs',
'POST',
Expand Down
Loading
Loading