Handle exceptions during draining pending calls / streams#12737
Handle exceptions during draining pending calls / streams#12737kannanjgithub wants to merge 5 commits intogrpc:masterfrom
Conversation
…handled framing exception during the draining of DelayedStream. When MessageFramer throws an exception (e.g., RESOURCE_EXHAUSTED), it bubbles up through DelayedStream.drainPendingCalls and is eventually caught and swallowed by ThreadlessExecutor.runQuietly. This leaves the DelayedStream in an inconsistent state (where passThrough is still false), and the responseFuture never completes, causing the blocking call to hang forever. I have implemented a fix that adds proper exception handling to the draining loops in both DelayedStream and DelayedClientCall. When an exception occurs during draining: 1. The realStream (or realCall) is explicitly cancelled with the error. 2. The pending calls are cleared. 3. The stream/call transitions to passThrough = true to prevent getting stuck. 4. The listener's pending callbacks are drained, ensuring that any closure notifications are delivered to the application.
…Stream and DelayedClientCall
ejona86
left a comment
There was a problem hiding this comment.
I think this isn't a great way to fix #12109 . The problem is "we shouldn't be throwing." Maybe we should be throwing for Error, like out-of-memory. But definitely not for this self-created exception. I do wonder why people are using withMaxOutboundMessageSize(), as it exists only as a hack to support the option in service config (it really shouldn't have even been a public API). That said, it can still happen with service config, so we do need to handled it (and it exits in service config as a hack for lack a server-side service config).
Not to say this PR is useless. We should indeed handle exceptions from the client better in some of these cases. But I don't think we should rely on the client's exception handling at all in the withMaxOutboundMessageSize() case.
I'm thinking maybe something closer to this instead. On server-side it would definitely be safer to use cancel(), but I think this plumbing has a hope of working:
diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
index bce1820b4..b286ea1e6 100644
--- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractClientStream.java
@@ -194,6 +194,11 @@ public abstract class AbstractClientStream extends AbstractStream
}
}
+ @Override
+ protected void framingFailed(Throwable cause) {
+ cancel(Status.CANCELLED.withDescription("Unable to frame message").withCause(cause));
+ }
+
@Override
public final void cancel(Status reason) {
Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerStream.java b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
index c468cba97..fe669fb25 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerStream.java
@@ -22,6 +22,8 @@ import io.grpc.Decompressor;
import io.grpc.InternalStatus;
import io.grpc.Metadata;
import io.grpc.Status;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
@@ -70,6 +72,8 @@ public abstract class AbstractServerStream extends AbstractStream
void cancel(Status status);
}
+ private static final Logger log = Logger.getLogger(AbstractServerStream.class.getName());
+
private final MessageFramer framer;
private final StatsTraceContext statsTraceCtx;
private boolean outboundClosed;
@@ -119,6 +123,18 @@ public abstract class AbstractServerStream extends AbstractStream
abstractServerStreamSink().writeFrame(frame, flush, numMessages);
}
+ @Override
+ protected void framingFailed(Throwable t) {
+ Status status = Status.fromThrowable(t);
+ if (status.getCause() != null) {
+ // The status description may be mostly useless (e.g., with UNKNOWN or INTERNAL), with most
+ // details actually in the cause. We can't propagate those details to the client, so log
+ // instead.
+ log.log(Level.SEVERE, "Failed framing message. Closing RPC: " + status, status.getCause());
+ }
+ close(status, new Metadata());
+ }
+
@Override
public final void close(Status status, Metadata trailers) {
Preconditions.checkNotNull(status, "status");
diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java
index 9f5fb035d..2b53d3cf3 100644
--- a/core/src/main/java/io/grpc/internal/AbstractStream.java
+++ b/core/src/main/java/io/grpc/internal/AbstractStream.java
@@ -47,6 +47,13 @@ public abstract class AbstractStream implements Stream {
*/
protected abstract TransportState transportState();
+ /**
+ * Called when a message framing operation failed, to kill the RPC and report the failure.
+ *
+ * @param cause the actual failure
+ */
+ protected abstract void framingFailed(Throwable cause);
+
@Override
public void optimizeForDirectExecutor() {
transportState().optimizeForDirectExecutor();
@@ -69,6 +76,9 @@ public abstract class AbstractStream implements Stream {
if (!framer().isClosed()) {
framer().writePayload(message);
}
+ } catch (Throwable t) {
+ framer().dispose();
+ framingFailed(t);
} finally {
GrpcUtil.closeQuietly(message);
}| void drainPendingCallbacks() { | ||
| assert !passThrough; | ||
| List<Runnable> toRun = new ArrayList<>(); | ||
| drainOut: |
| Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); | ||
| realCall.cancel(status.getDescription(), status.getCause()); |
There was a problem hiding this comment.
fromThrowable() will unpackage things, and then the'll get thrown away, since the description is overwritten and the status code is discarded. Don't do that.
Instead, do it the straight-forward way:
| Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); | |
| realCall.cancel(status.getDescription(), status.getCause()); | |
| realCall.cancel("Failed to drain pending calls", t); |
That will produce a CANCELLED status with all the information known at this point.
| Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); | ||
| realStream.cancel(status); |
There was a problem hiding this comment.
| Status status = Status.fromThrowable(t).withDescription("Failed to drain pending calls"); | |
| realStream.cancel(status); | |
| Status status = Status.CANCELLED.withDescription("Failed to drain pending calls").withCause(t); | |
| realStream.cancel(status); |
| } catch (Throwable t) { | ||
| synchronized (this) { | ||
| pendingCallbacks = null; | ||
| passThrough = true; |
There was a problem hiding this comment.
I'm pretty certain this is wrong. You can't just delete a few events and then start delivering all the others. ClientCallImpl has the code to handle this, but we don't know if onClose() was part of pendingCallbacks() or has already been called.
We can keep track of whether onClose() has been enqueued and whether realListener.onClose() has been called:
- If
realListener.onClose()has been called, then just throw up the stack and let it log (passThrough = truedoesn't matter, because no more callbacks will be issued) - If
onClose()has been enqueued, then clear pendingCallbacks and deliver a fabricated onClose() callback about cancelling because of the exception (passThrough = truedoesn't matter, because no more callbacks will be issued) - If
onClose()has not been called... we don't actually have a good option. We could try tocall.cancel(), but it isn't thread-safe to call from here and can cause exceptions in the application. It may be the least-bad option though. I think a real solution requires Ability to get callExecutor in ClientInterceptor implementation #7868, so we can run callbacks within the context of ClientCallImpl
| } catch (Throwable t) { | ||
| synchronized (this) { | ||
| pendingCallbacks = null; | ||
| passThrough = true; |
There was a problem hiding this comment.
This has similar problems to what I commented in DelayedClientCall. But I think the solution here is "this should never happen." SerializingExecutor protects against RuntimeException when directExecutor is used. For Errors... "don't care" could be okay; everything is broken anyway on an error. Yeah, I'd be willing to leave the code as it was and only change it once we know the situation that's a problem.
We do have the option of calling stream.cancel() from here, but I think it doesn't make sense to spend much thought on it because it really shouldn't be happening.
The reported hang in issue #12109 in
blockingUnaryCallwas caused by an unhandled framing exception during the draining ofDelayedStream. WhenMessageFramerthrows anexception (e.g., RESOURCE_EXHAUSTED), it bubbles up through
DelayedStream.drainPendingCallsand is eventually caught and swallowed byThreadlessExecutor.runQuietly. This leaves theDelayedStreamin an inconsistent state (where passThrough is still false), and theresponseFuturenever completes, causing the blocking call to hang forever.This fix adds proper exception handling to the draining loops in both
DelayedStreamandDelayedClientCall. When an exception occurs during draining:Fixes #12109