Skip to content

Handle exceptions during draining pending calls / streams#12737

Open
kannanjgithub wants to merge 5 commits intogrpc:masterfrom
kannanjgithub:framer-exception-stream-hang
Open

Handle exceptions during draining pending calls / streams#12737
kannanjgithub wants to merge 5 commits intogrpc:masterfrom
kannanjgithub:framer-exception-stream-hang

Conversation

@kannanjgithub
Copy link
Copy Markdown
Contributor

@kannanjgithub kannanjgithub commented Mar 30, 2026

The reported hang in issue #12109 in blockingUnaryCall was caused by an unhandled framing exception during the draining of DelayedStream. When MessageFramer throws an
exception (e.g., RESOURCE_EXHAUSTED), it bubbles up through DelayedStream.drainPendingCalls and is eventually caught and swallowed by ThreadlessExecutor.runQuietly. This leaves the DelayedStream in an inconsistent state (where passThrough is still false), and the responseFuture never completes, causing the blocking call to hang forever.

This fix adds proper exception handling to the draining loops in both DelayedStream and DelayedClientCall. When an exception occurs during draining:

  1. The realStream (or realCall) is explicitly cancelled with the error.
  2. The pending calls are cleared.
  3. The stream/call transitions to passThrough = true to prevent getting stuck.
  4. The listener's pending callbacks are drained, ensuring that any closure notifications are delivered to the application.

Fixes #12109

…handled framing exception during the draining of DelayedStream. When MessageFramer throws an

  exception (e.g., RESOURCE_EXHAUSTED), it bubbles up through DelayedStream.drainPendingCalls and is eventually caught and swallowed by
  ThreadlessExecutor.runQuietly. This leaves the DelayedStream in an inconsistent state (where passThrough is still false), and the responseFuture never
  completes, causing the blocking call to hang forever.

  I have implemented a fix that adds proper exception handling to the draining loops in both DelayedStream and DelayedClientCall. When an exception occurs
  during draining:
   1. The realStream (or realCall) is explicitly cancelled with the error.
   2. The pending calls are cleared.
   3. The stream/call transitions to passThrough = true to prevent getting stuck.
   4. The listener's pending callbacks are drained, ensuring that any closure notifications are delivered to the application.
@kannanjgithub kannanjgithub changed the title Handle exceptions during draining pending calls Handle exceptions during draining pending calls / streams Mar 30, 2026
Copy link
Copy Markdown
Member

@ejona86 ejona86 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't a great way to fix #12109 . The problem is "we shouldn't be throwing." Maybe we should be throwing for Error, like out-of-memory. But definitely not for this self-created exception. I do wonder why people are using withMaxOutboundMessageSize(), as it exists only as a hack to support the option in service config (it really shouldn't have even been a public API). That said, it can still happen with service config, so we do need to handled it (and it exits in service config as a hack for lack a server-side service config).

Not to say this PR is useless. We should indeed handle exceptions from the client better in some of these cases. But I don't think we should rely on the client's exception handling at all in the withMaxOutboundMessageSize() case.

I'm thinking maybe something closer to this instead. On server-side it would definitely be safer to use cancel(), but I think this plumbing has a hope of working:

diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index bce1820b4..b286ea1e6 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -194,6 +194,11 @@ public abstract class AbstractClientStream extends AbstractStream
     }
   }
 
+  @Override
+  protected void framingFailed(Throwable cause) {
+    cancel(Status.CANCELLED.withDescription("Unable to frame message").withCause(cause));
+  }
+
   @Override
   public final void cancel(Status reason) {
     Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
index c468cba97..fe669fb25 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
@@ -22,6 +22,8 @@ import io.grpc.Decompressor;
 import io.grpc.InternalStatus;
 import io.grpc.Metadata;
 import io.grpc.Status;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import javax.annotation.Nullable;
 
 /**
@@ -70,6 +72,8 @@ public abstract class AbstractServerStream extends AbstractStream
     void cancel(Status status);
   }
 
+  private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName());
+
   private final MessageFramer framer;
   private final StatsTraceContext statsTraceCtx;
   private boolean outboundClosed;
@@ -119,6 +123,18 @@ public abstract class AbstractServerStream extends AbstractStream
     abstractServerStreamSink().writeFrame(frame, flush, numMessages);
   }
 
+  @Override
+  protected void framingFailed(Throwable t) {
+    Status status = Status.fromThrowable(t);
+    if (status.getCause() != null) {
+      // The status description may be mostly useless (e.g., with UNKNOWN or INTERNAL), with most
+      // details actually in the cause. We can't propagate those details to the client, so log
+      // instead.
+      log.log(Level.SEVERE, "Failed framing message. Closing RPC: " + status, status.getCause());
+    }
+    close(status, new Metadata());
+  }
+
   @Override
   public final void close(Status status, Metadata trailers) {
     Preconditions.checkNotNull(status, "status");
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 9f5fb035d..2b53d3cf3 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -47,6 +47,13 @@ public abstract class AbstractStream implements Stream {
    */
   protected abstract TransportState transportState();
 
+  /**
+   * Called when a message framing operation failed, to kill the RPC and report the failure.
+   *
+   * @param cause the actual failure
+   */
+  protected abstract void framingFailed(Throwable cause);
+
   @Override
   public void optimizeForDirectExecutor() {
     transportState().optimizeForDirectExecutor();
@@ -69,6 +76,9 @@ public abstract class AbstractStream implements Stream {
       if (!framer().isClosed()) {
         framer().writePayload(message);
       }
+    } catch (Throwable t) {
+      framer().dispose();
+      framingFailed(t);
     } finally {
       GrpcUtil.closeQuietly(message);
     }

void drainPendingCallbacks() {
assert !passThrough;
List<Runnable> toRun = new ArrayList<>();
drainOut:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused.

Comment on lines +318 to +319
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
realCall.cancel(status.getDescription(), status.getCause());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fromThrowable() will unpackage things, and then the'll get thrown away, since the description is overwritten and the status code is discarded. Don't do that.

Instead, do it the straight-forward way:

Suggested change
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
realCall.cancel(status.getDescription(), status.getCause());
realCall.cancel("Failed to drain pending calls", t);

That will produce a CANCELLED status with all the information known at this point.

Comment on lines +196 to +197
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
realStream.cancel(status);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls");
realStream.cancel(status);
Status status = Status.CANCELLED.withDescription("Failed to drain pending calls").withCause(t);
realStream.cancel(status);

} catch (Throwable t) {
synchronized (this) {
pendingCallbacks = null;
passThrough = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty certain this is wrong. You can't just delete a few events and then start delivering all the others. ClientCallImpl has the code to handle this, but we don't know if onClose() was part of pendingCallbacks() or has already been called.

We can keep track of whether onClose() has been enqueued and whether realListener.onClose() has been called:

  1. If realListener.onClose() has been called, then just throw up the stack and let it log (passThrough = true doesn't matter, because no more callbacks will be issued)
  2. If onClose() has been enqueued, then clear pendingCallbacks and deliver a fabricated onClose() callback about cancelling because of the exception (passThrough = true doesn't matter, because no more callbacks will be issued)
  3. If onClose() has not been called... we don't actually have a good option. We could try to call.cancel(), but it isn't thread-safe to call from here and can cause exceptions in the application. It may be the least-bad option though. I think a real solution requires Ability to get callExecutor in ClientInterceptor implementation #7868, so we can run callbacks within the context of ClientCallImpl

} catch (Throwable t) {
synchronized (this) {
pendingCallbacks = null;
passThrough = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has similar problems to what I commented in DelayedClientCall. But I think the solution here is "this should never happen." SerializingExecutor protects against RuntimeException when directExecutor is used. For Errors... "don't care" could be okay; everything is broken anyway on an error. Yeah, I'd be willing to leave the code as it was and only change it once we know the situation that's a problem.

We do have the option of calling stream.cancel() from here, but I think it doesn't make sense to spend much thought on it because it really shouldn't be happening.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Blocking stub blocks indefinitely if client exceeds max outbound message size

2 participants