diff --git a/.vscode/settings.json b/.vscode/settings.json index 24a9e3428..6f30b8006 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -16,10 +16,6 @@ "source.organizeImports": "explicit" } }, - "workbench.colorCustomizations": { - "titleBar.activeBackground": "#0099cc", - "titleBar.inactiveBackground": "#0099cc" - }, "python.testing.pytestArgs": [ "tests" ], diff --git a/pyproject.toml b/pyproject.toml index fc5ca1893..f5c8b5da5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-langchain" -version = "0.8.26" +version = "0.8.27" description = "Python SDK that enables developers to build and deploy LangGraph agents to the UiPath Cloud Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" @@ -22,6 +22,7 @@ dependencies = [ "jsonpath-ng>=1.7.0", "mcp==1.26.0", "langchain-mcp-adapters==0.2.1", + "a2a-sdk>=0.3.0", ] classifiers = [ diff --git a/src/uipath_langchain/agent/tools/__init__.py b/src/uipath_langchain/agent/tools/__init__.py index d06e191b2..e42030329 100644 --- a/src/uipath_langchain/agent/tools/__init__.py +++ b/src/uipath_langchain/agent/tools/__init__.py @@ -1,5 +1,6 @@ """Tool creation and management for LowCode agents.""" +from .a2a import create_a2a_agent_tools from .context_tool import create_context_tool from .escalation_tool import create_escalation_tool from .extraction_tool import create_ixp_extraction_tool @@ -13,6 +14,7 @@ from .tool_node import ToolWrapperMixin, UiPathToolNode, create_tool_node __all__ = [ + "create_a2a_agent_tools", "create_tools_from_resources", "create_tool_node", "create_context_tool", diff --git a/src/uipath_langchain/agent/tools/a2a/__init__.py b/src/uipath_langchain/agent/tools/a2a/__init__.py new file mode 100644 index 000000000..b94e123fc --- /dev/null +++ b/src/uipath_langchain/agent/tools/a2a/__init__.py @@ -0,0 +1,7 @@ +"""A2A (Agent-to-Agent) tools.""" + +from .a2a_tool import create_a2a_agent_tools + +__all__ = [ + "create_a2a_agent_tools", +] diff --git a/src/uipath_langchain/agent/tools/a2a/a2a_tool.py b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py new file mode 100644 index 000000000..1a33193c6 --- /dev/null +++ b/src/uipath_langchain/agent/tools/a2a/a2a_tool.py @@ -0,0 +1,447 @@ +"""A2A singleton tool — one tool per remote agent. + +Each tool maintains conversation context (task_id/context_id) across calls. +The persistence strategy is controlled by the advancedPersistance flag: +- True: deterministic persistence via graph state (tools_storage) +- False/absent: LLM passthrough (IDs in tool schema and response) + +Authentication uses the UiPath SDK Bearer token, resolved lazily on first call. +""" + +from __future__ import annotations + +import asyncio +import json +from logging import getLogger +from typing import Any +from uuid import uuid4 + +import httpx +from a2a.client import Client +from a2a.types import ( + AgentCard, + Message, + Part, + Role, + Task, + TaskArtifactUpdateEvent, + TaskState, + TextPart, +) +from langchain_core.messages import ToolCall, ToolMessage +from langchain_core.tools import BaseTool +from langgraph.types import Command +from pydantic import BaseModel, Field +from uipath.agent.models.agent import AgentA2aResourceConfig + +from uipath_langchain.agent.react.types import AgentGraphState +from uipath_langchain.agent.tools.base_uipath_structured_tool import ( + BaseUiPathStructuredTool, +) +from uipath_langchain.agent.tools.tool_node import ( + ToolWrapperMixin, + ToolWrapperReturnType, +) +from uipath_langchain.agent.tools.utils import sanitize_tool_name + +logger = getLogger(__name__) + + +class A2aToolInput(BaseModel): + """Input schema for A2A agent tool with graph state persistence.""" + + message: str = Field(description="The message to send to the remote agent.") + + +class A2aToolPassthroughInput(BaseModel): + """Input schema for A2A agent tool with LLM passthrough.""" + + message: str = Field(description="The message to send to the remote agent.") + task_id: str | None = Field( + default=None, + description="Task ID from a previous call to this tool. " + "Pass it to continue an existing conversation.", + ) + context_id: str | None = Field( + default=None, + description="Context ID from a previous call to this tool. " + "Pass it to continue an existing conversation.", + ) + + + +class A2aStructuredToolWithWrapper(BaseUiPathStructuredTool, ToolWrapperMixin): + pass + + + +def _extract_text(obj: Task | Message) -> str: + """Extract text content from a Task or Message response.""" + parts: list[Part] = [] + + if isinstance(obj, Message): + parts = obj.parts or [] + elif isinstance(obj, Task): + if obj.status and obj.status.state == TaskState.input_required: + if obj.status.message: + parts = obj.status.message.parts or [] + else: + if obj.artifacts: + for artifact in obj.artifacts: + parts.extend(artifact.parts or []) + if not parts and obj.status and obj.status.message: + parts = obj.status.message.parts or [] + if not parts and obj.history: + for msg in reversed(obj.history): + if msg.role == Role.agent: + parts = msg.parts or [] + break + + texts = [] + for part in parts: + if isinstance(part.root, TextPart): + texts.append(part.root.text) + return "\n".join(texts) if texts else "" + + +def _format_response( + text: str, + state: str, + *, + task_id: str | None = None, + context_id: str | None = None, + include_ids: bool = False, +) -> str: + """Build a structured tool response the LLM can act on.""" + result: dict[str, Any] = {"agent_response": text, "task_state": state} + if include_ids: + result["task_id"] = task_id + result["context_id"] = context_id + return json.dumps(result) + + +def _build_description(card: AgentCard) -> str: + """Build a tool description from an agent card.""" + parts = [] + if card.description: + parts.append(card.description) + if card.skills: + for skill in card.skills: + skill_desc = skill.name or "" + if skill.description: + skill_desc += f": {skill.description}" + if skill_desc: + parts.append(f"Skill: {skill_desc}") + return " | ".join(parts) if parts else f"Remote A2A agent at {card.url}" + + +def fetch_agent_card( + agent_card_url: str, headers: dict[str, str] | None = None +) -> AgentCard: + """Fetch the agent card from a remote A2A endpoint (synchronous).""" + response = httpx.get(agent_card_url, headers=headers or {}, timeout=30) + response.raise_for_status() + return AgentCard(**response.json()) + + +def _resolve_a2a_url(config: AgentA2aResourceConfig) -> str: + """Resolve the A2A endpoint URL from config. + + Uses a2aUrl if available, otherwise derives from agentCardUrl + by stripping the .well-known path. + """ + a2a_url = getattr(config, "a2a_url", None) + if a2a_url: + return a2a_url + return config.agent_card_url.replace("/.well-known/agent-card.json", "") + + +async def create_a2a_agent_tools( + resources: list[AgentA2aResourceConfig], +) -> list[BaseTool]: + """Create A2A tools from a list of A2A resource configurations. + + Each enabled A2A resource becomes a single tool representing the remote agent. + The persistence strategy is determined by the advancedPersistance flag on each resource. + + Args: + resources: List of A2A resource configurations from agent.json. + + Returns: + List of BaseTool instances, one per enabled A2A resource. + """ + tools: list[BaseTool] = [] + + for resource in resources: + if resource.is_enabled is False: + logger.info("Skipping disabled A2A resource '%s'", resource.name) + continue + if resource.is_active is False: + logger.info("Skipping inactive A2A resource '%s'", resource.name) + continue + + logger.info( + "Creating A2A tool for resource '%s' (advancedPersistance=%s)", + resource.name, + resource.advanced_persistance, + ) + tool = _create_a2a_singleton_tool(resource) + tools.append(tool) + + return tools + + + +async def _send_a2a_message( + client: Client, + a2a_url: str, + *, + message: str, + task_id: str | None, + context_id: str | None, +) -> tuple[str, str, str | None, str | None]: + """Send a message to a remote A2A agent and return the response. + + Args: + client: The A2A protocol client. + a2a_url: The remote agent URL (for logging). + message: The user message text. + task_id: Prior task ID for conversation continuity. + context_id: Prior context ID for conversation continuity. + + Returns: + Tuple of (response_text, task_state, new_task_id, new_context_id). + """ + if task_id or context_id: + logger.info( + "A2A continue task=%s context=%s to %s", task_id, context_id, a2a_url + ) + else: + logger.info("A2A new message to %s", a2a_url) + + a2a_message = Message( + role=Role.user, + parts=[Part(root=TextPart(text=message))], + message_id=str(uuid4()), + task_id=task_id, + context_id=context_id, + ) + + try: + text = "" + state = "unknown" + new_task_id = task_id + new_context_id = context_id + + async for event in client.send_message(a2a_message): + if isinstance(event, Message): + text = _extract_text(event) + new_context_id = event.context_id + state = "completed" + break + else: + task, update = event + new_task_id = task.id + new_context_id = task.context_id + state = task.status.state.value if task.status else "unknown" + if update is None: + text = _extract_text(task) + break + elif isinstance(update, TaskArtifactUpdateEvent): + for part in update.artifact.parts or []: + if isinstance(part.root, TextPart): + text += part.root.text + + return (text or "No response received.", state, new_task_id, new_context_id) + + except Exception as e: + logger.exception("A2A request to %s failed", a2a_url) + return (f"Error: {e}", "error", task_id, context_id) + + + + +def _create_a2a_singleton_tool(config: AgentA2aResourceConfig) -> BaseTool: + """Create a single LangChain tool for A2A communication. + + Branches on config.advanced_persistance: + - True: graph state via tools_storage + - False: LLM passthrough with task_id/context_id in schema + + Args: + config: A2A resource configuration from agent.json. + + Returns: + A BaseTool that sends messages to the remote A2A agent. + """ + if config.cached_agent_card: + agent_card = AgentCard(**config.cached_agent_card) + else: + agent_card = fetch_agent_card(config.agent_card_url) + + raw_name = agent_card.name or config.name + tool_name = sanitize_tool_name(raw_name) + tool_description = _build_description(agent_card) + a2a_url = _resolve_a2a_url(config) + + _lock = asyncio.Lock() + _client: Client | None = None + _http_client: httpx.AsyncClient | None = None + + async def _ensure_client() -> Client: + nonlocal _client, _http_client + if _client is None: + async with _lock: + if _client is None: + from a2a.client import ClientConfig, ClientFactory + from uipath.platform import UiPath + + sdk = UiPath() + _http_client = httpx.AsyncClient( + timeout=120, + headers={"Authorization": f"Bearer {sdk._config.secret}"}, + ) + _client = await ClientFactory.connect( + a2a_url, + client_config=ClientConfig( + httpx_client=_http_client, + streaming=False, + ), + ) + return _client # type: ignore[return-value] + + metadata = { + "tool_type": "a2a", + "display_name": raw_name, + "slug": config.slug, + } + + if config.advanced_persistance: + return _create_graph_state_tool( + tool_name, tool_description, a2a_url, _ensure_client, metadata + ) + else: + return _create_passthrough_tool( + tool_name, tool_description, a2a_url, _ensure_client, metadata + ) + + + + +def _create_graph_state_tool( + tool_name: str, + tool_description: str, + a2a_url: str, + ensure_client: Any, + metadata: dict[str, Any], +) -> BaseTool: + """Create an A2A tool that persists task_id/context_id in graph state.""" + + async def _send(*, message: str) -> str: + client = await ensure_client() + text, state, _, _ = await _send_a2a_message( + client, a2a_url, message=message, task_id=None, context_id=None + ) + return _format_response(text, state) + + async def _a2a_wrapper( + tool: BaseTool, + call: ToolCall, + state: AgentGraphState, + ) -> ToolWrapperReturnType: + prior = state.inner_state.tools_storage.get(tool.name) or {} + task_id = prior.get("task_id") + context_id = prior.get("context_id") + + logger.info( + "A2A wrapper read from tools_storage: task_id=%s context_id=%s", + task_id, + context_id, + ) + + client = await ensure_client() + text, task_state, new_task_id, new_context_id = await _send_a2a_message( + client, + a2a_url, + message=call["args"]["message"], + task_id=task_id, + context_id=context_id, + ) + + logger.info( + "A2A wrapper writing to tools_storage: task_id=%s context_id=%s", + new_task_id, + new_context_id, + ) + + return Command( + update={ + "messages": [ + ToolMessage( + content=_format_response(text, task_state), + name=call["name"], + tool_call_id=call["id"], + ) + ], + "inner_state": { + "tools_storage": { + tool.name: { + "task_id": new_task_id, + "context_id": new_context_id, + } + } + }, + } + ) + + tool = A2aStructuredToolWithWrapper( + name=tool_name, + description=tool_description, + coroutine=_send, + args_schema=A2aToolInput, + metadata=metadata, + ) + tool.set_tool_wrappers(awrapper=_a2a_wrapper) + return tool + + + + +def _create_passthrough_tool( + tool_name: str, + tool_description: str, + a2a_url: str, + ensure_client: Any, + metadata: dict[str, Any], +) -> BaseTool: + """Create an A2A tool where the LLM manages task_id/context_id passthrough.""" + + async def _send( + *, + message: str, + task_id: str | None = None, + context_id: str | None = None, + ) -> str: + client = await ensure_client() + text, state, new_task_id, new_context_id = await _send_a2a_message( + client, + a2a_url, + message=message, + task_id=task_id, + context_id=context_id, + ) + return _format_response( + text, + state, + task_id=new_task_id, + context_id=new_context_id, + include_ids=True, + ) + + return BaseUiPathStructuredTool( + name=tool_name, + description=tool_description, + coroutine=_send, + args_schema=A2aToolPassthroughInput, + metadata=metadata, + ) diff --git a/uv.lock b/uv.lock index 0e6d82c19..5b265d74d 100644 --- a/uv.lock +++ b/uv.lock @@ -7,6 +7,23 @@ resolution-markers = [ "python_full_version < '3.13'", ] +[[package]] +name = "a2a-sdk" +version = "0.3.25" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "google-api-core", version = "2.25.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.14'" }, + { name = "google-api-core", version = "2.29.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.14'" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "protobuf" }, + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/55/83/3c99b276d09656cce039464509f05bf385e5600d6dc046a131bbcf686930/a2a_sdk-0.3.25.tar.gz", hash = "sha256:afda85bab8d6af0c5d15e82f326c94190f6be8a901ce562d045a338b7127242f", size = 270638, upload-time = "2026-03-10T13:08:46.417Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/f9/6a62520b7ecb945188a6e1192275f4732ff9341cd4629bc975a6c146aeab/a2a_sdk-0.3.25-py3-none-any.whl", hash = "sha256:2fce38faea82eb0b6f9f9c2bcf761b0d78612c80ef0e599b50d566db1b2654b5", size = 149609, upload-time = "2026-03-10T13:08:44.7Z" }, +] + [[package]] name = "aiohappyeyeballs" version = "2.6.1" @@ -3333,9 +3350,10 @@ wheels = [ [[package]] name = "uipath-langchain" -version = "0.8.26" +version = "0.8.27" source = { editable = "." } dependencies = [ + { name = "a2a-sdk" }, { name = "httpx" }, { name = "jsonpath-ng" }, { name = "jsonschema-pydantic-converter" }, @@ -3383,6 +3401,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "a2a-sdk", specifier = ">=0.3.0" }, { name = "boto3-stubs", marker = "extra == 'bedrock'", specifier = ">=1.41.4" }, { name = "google-generativeai", marker = "extra == 'vertex'", specifier = ">=0.8.0" }, { name = "httpx", specifier = ">=0.27.0" },