Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2491,7 +2491,7 @@ public void testAgentToAgentDelegation() throws Exception {
BiConsumer<ClientEvent, AgentCard> delegationConsumer =
AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch);

getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
getNonStreamingClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
delegationErrorRef.set(error);
delegationLatch.countDown();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
Expand Down Expand Up @@ -176,49 +173,56 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm

/**
* Handles delegation by forwarding to another agent via client.
* <p>
* Uses blocking client call (streaming=false) which should return the final task state
* synchronously without requiring async callbacks and latches. This simplified approach
* avoids race conditions between event consumption and callback invocation.
*/
private void handleDelegation(String userInput, TransportProtocol transportProtocol,
AgentEmitter agentEmitter) {
// Strip "delegate:" prefix
String delegatedContent = userInput.substring("delegate:".length()).trim();

// Create client for same transport
// Create client for same transport (streaming=false for blocking behavior)
try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol)) {
agentEmitter.startWork();

// Set up consumer to capture task result
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Task> resultRef = new AtomicReference<>();
AtomicReference<Throwable> errorRef = new AtomicReference<>();

BiConsumer<ClientEvent, AgentCard> consumer =
AgentToAgentClientFactory.createTaskCaptureConsumer(resultRef, latch);
// Store the result task from blocking call
AtomicReference<Task> taskRef = new AtomicReference<>();

// Delegate to another agent (new task on same server)
// Add a marker so the receiving agent knows to complete the task
Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
client.sendMessage(delegatedMessage, List.of(consumer), error -> {
errorRef.set(error);
latch.countDown();
});

// Wait for response
if (!latch.await(30, TimeUnit.SECONDS)) {
agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
return;
}
// Blocking call should return final task synchronously
client.sendMessage(delegatedMessage, List.of((event, card) -> {
if (event instanceof TaskEvent te) {
taskRef.set(te.getTask());
} else if (event instanceof TaskUpdateEvent tue) {
taskRef.set(tue.getTask());
}
Comment on lines +199 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The consumer lambda for sendMessage includes a check for TaskUpdateEvent. However, since this sendMessage call is on a non-streaming client (streaming=false), the client will only invoke the consumer with a final TaskEvent or MessageEvent after the blocking call completes. It will not receive intermediate TaskUpdateEvents. Therefore, the else if (event instanceof TaskUpdateEvent tue) branch is effectively dead code in this context. Removing it would make the code clearer and more accurately reflect the behavior of a non-streaming client call.

                        if (event instanceof TaskEvent te) {
                            taskRef.set(te.getTask());
                        }

}), null);

Task delegatedResult = resultRef.get();
// Blocking call should have completed before returning
Task delegatedResult = taskRef.get();

// Check for error only if we didn't get a successful result
// (errors can occur after completion due to stream cleanup)
if (delegatedResult == null && errorRef.get() != null) {
agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
if (delegatedResult == null) {
agentEmitter.fail(new InternalError("No result received from blocking delegation call"));
return;
}

if (delegatedResult == null) {
agentEmitter.fail(new InternalError("No result received from delegation"));
// DIAGNOSTIC: Check if task is actually final
// If blocking call returns non-final task, it indicates a server-side race condition
if (!delegatedResult.status().state().isFinal()) {
String diagnostic = String.format(
"RACE CONDITION DETECTED: Blocking call returned non-final task! " +
"State: %s, TaskId: %s, Artifacts: %d. " +
"This indicates DefaultRequestHandler wait logic failed to synchronize with MainEventBusProcessor.",
delegatedResult.status().state(),
delegatedResult.id(),
delegatedResult.artifacts() != null ? delegatedResult.artifacts().size() : 0);
System.err.println(diagnostic); // Also print to stderr for CI visibility
agentEmitter.fail(new InternalError(diagnostic));
return;
}

Expand All @@ -234,9 +238,6 @@ private void handleDelegation(String userInput, TransportProtocol transportProto
agentEmitter.complete();
} catch (A2AClientException e) {
agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
}
}

Expand Down