Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb52bbf
Save changes.
kannanjgithub Jan 19, 2026
f518d5e
Save changes.
kannanjgithub Jan 20, 2026
3874631
Save changes.
kannanjgithub Jan 20, 2026
0bad82f
Save changes.
kannanjgithub Jan 21, 2026
51b38e2
Fix enum test.
kannanjgithub Jan 21, 2026
4da06a4
Revert temp changes.
kannanjgithub Jan 21, 2026
df70a27
Revert temp changes.
kannanjgithub Jan 21, 2026
bee005f
Try with server streaming alone.
kannanjgithub Jan 23, 2026
efce818
Interceptor logic.
kannanjgithub Jan 31, 2026
2ac2d47
Closing the stream after the rpc is done.
kannanjgithub Feb 10, 2026
4214889
Style fixes.
kannanjgithub Feb 10, 2026
e099d1d
Fix test name.
kannanjgithub Feb 10, 2026
3d75bf8
Fix style warnings.
kannanjgithub Feb 10, 2026
84d9528
Fix build.
kannanjgithub Feb 10, 2026
3136bca
Review comments - Build channel separately for MCS connection scaling…
kannanjgithub Feb 12, 2026
c3fc7c3
Address Review comments.
kannanjgithub Feb 12, 2026
93cb9ad
Expand the test name on the client side as well.
kannanjgithub Feb 13, 2026
3719011
Add debug print statements to diagnose why server is not starting.
kannanjgithub Feb 13, 2026
dbb3881
Revert "Add debug print statements to diagnose why server is not star…
kannanjgithub Feb 13, 2026
bd36d59
Add temp debug stmts for server start.
kannanjgithub Feb 13, 2026
0216d3b
Rename request proto field.
kannanjgithub Feb 19, 2026
3346bf8
Rename request proto field.
kannanjgithub Feb 19, 2026
d27128b
Rename response proto field.
kannanjgithub Feb 19, 2026
a8d66e1
Address review comments.
kannanjgithub Feb 20, 2026
9b66653
Specify MCS limit directly via command line arg.
kannanjgithub Mar 31, 2026
809ae0f
Merge remote-tracking branch 'origin/mcs-interop-tests' into mcs-inte…
kannanjgithub Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public enum TestCases {
RPC_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on the same channel"),
CHANNEL_SOAK("sends 'soak_iterations' large_unary rpcs in a loop, each on a new channel"),
ORCA_PER_RPC("report backend metrics per query"),
ORCA_OOB("report backend metrics out-of-band");
ORCA_OOB("report backend metrics out-of-band"),
MAX_CONCURRENT_STREAMS_CONNECTION_SCALING("max concurrent streaming connection scaling");

private final String description;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.grpc.testing.integration.Messages.TestOrcaReport;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
Expand Down Expand Up @@ -563,7 +564,46 @@ private void runTest(TestCases testCase) throws Exception {
tester.testOrcaOob();
break;
}


case MAX_CONCURRENT_STREAMS_CONNECTION_SCALING: {
ChannelCredentials channelCredentials;
if (useTls) {
if (!useTestCa) {
channelCredentials = TlsChannelCredentials.create();
} else {
try {
channelCredentials = TlsChannelCredentials.newBuilder()
.trustManager(TlsTesting.loadCert("ca.pem"))
.build();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
} else {
channelCredentials = InsecureChannelCredentials.create();
}
ManagedChannelBuilder<?> channelBuilder;
if (serverPort == 0) {
channelBuilder = Grpc.newChannelBuilder(serverHost, channelCredentials);
} else {
channelBuilder =
Grpc.newChannelBuilderForAddress(serverHost, serverPort, channelCredentials);
}
if (serverHostOverride != null) {
channelBuilder.overrideAuthority(serverHostOverride);
}
channelBuilder.disableServiceConfigLookUp();
try {
@SuppressWarnings("unchecked")
Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse(
"{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
channelBuilder.defaultServiceConfig(serviceConfigMap);
} catch (IOException e) {
throw new RuntimeException(e);
}
tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build()));
break;
}
default:
throw new IllegalArgumentException("Unknown test case: " + testCase);
}
Expand Down Expand Up @@ -596,6 +636,7 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor(
}

private class Tester extends AbstractInteropTest {

@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useGeneric = false;
Expand Down Expand Up @@ -979,31 +1020,17 @@ public void testOrcaOob() throws Exception {
.build();

final int retryLimit = 5;
BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
final Object lastItem = new Object();
StreamingOutputCallResponseObserver streamingOutputCallResponseObserver =
new StreamingOutputCallResponseObserver(lastItem);
StreamObserver<StreamingOutputCallRequest> streamObserver =
asyncStub.fullDuplexCall(new StreamObserver<StreamingOutputCallResponse>() {

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
queue.add(lastItem);
}
});
asyncStub.fullDuplexCall(streamingOutputCallResponseObserver);

streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.take())
.isInstanceOf(StreamingOutputCallResponse.class);
int i = 0;
for (; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1016,7 +1043,8 @@ public void onCompleted() {
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
.setOrcaOobReport(answer2)
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
assertThat(queue.take()).isInstanceOf(StreamingOutputCallResponse.class);
assertThat(streamingOutputCallResponseObserver.take())
.isInstanceOf(StreamingOutputCallResponse.class);

for (i = 0; i < retryLimit; i++) {
Thread.sleep(1000);
Expand All @@ -1027,7 +1055,7 @@ public void onCompleted() {
}
assertThat(i).isLessThan(retryLimit);
streamObserver.onCompleted();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need this? Otherwise we are orphaning the RPC.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assertThat(queue.take()).isSameInstanceAs(lastItem);
assertThat(streamingOutputCallResponseObserver.take()).isSameInstanceAs(lastItem);
}

@Override
Expand All @@ -1054,6 +1082,85 @@ protected ServerBuilder<?> getHandshakerServerBuilder() {
protected int operationTimeoutMillis() {
return 15000;
}

class StreamingOutputCallResponseObserver implements
StreamObserver<StreamingOutputCallResponse> {
private final Object lastItem;
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();

public StreamingOutputCallResponseObserver(Object lastItem) {
this.lastItem = lastItem;
}

@Override
public void onNext(StreamingOutputCallResponse value) {
queue.add(value);
}

@Override
public void onError(Throwable t) {
queue.add(t);
}

@Override
public void onCompleted() {
queue.add(lastItem);
}

Object take() throws InterruptedException {
return queue.take();
}
}

public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception {
final Object lastItem = new Object();
StreamingOutputCallResponseObserver responseObserver1 =
new StreamingOutputCallResponseObserver(lastItem);
StreamObserver<StreamingOutputCallRequest> streamObserver1 =
asyncStub.fullDuplexCall(responseObserver1);
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setFillPeerSocketAddress(
Messages.BoolValue.newBuilder().setValue(true).build())
.build())
.build();
streamObserver1.onNext(request);
Object responseObj = responseObserver1.take();
StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj;
String clientSocketAddressInCall1 = callResponse.getPeerSocketAddress();
assertThat(clientSocketAddressInCall1).isNotEmpty();

StreamingOutputCallResponseObserver responseObserver2 =
new StreamingOutputCallResponseObserver(lastItem);
StreamObserver<StreamingOutputCallRequest> streamObserver2 =
asyncStub.fullDuplexCall(responseObserver2);
streamObserver2.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver2.take();
String clientSocketAddressInCall2 = callResponse.getPeerSocketAddress();

assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2);

// The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
// connection to be created in the same subchannel and not get queued.
StreamingOutputCallResponseObserver responseObserver3 =
new StreamingOutputCallResponseObserver(lastItem);
StreamObserver<StreamingOutputCallRequest> streamObserver3 =
asyncStub.fullDuplexCall(responseObserver3);
streamObserver3.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver3.take();
String clientSocketAddressInCall3 = callResponse.getPeerSocketAddress();

// This assertion is currently failing because connection scaling when MCS limit has been
// reached is not yet implemented in gRPC Java.
assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);

streamObserver1.onCompleted();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: complete all three RPCs, then verify all three. That way the RPCs complete in parallel.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

streamObserver2.onCompleted();
streamObserver3.onCompleted();
assertThat(responseObserver1.take()).isSameInstanceAs(lastItem);
assertThat(responseObserver2.take()).isSameInstanceAs(lastItem);
assertThat(responseObserver3.take()).isSameInstanceAs(lastItem);
}
}

private static String validTestCasesHelpText() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@

package io.grpc.testing.integration;

import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR;

import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
Expand All @@ -42,10 +47,12 @@
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
import io.grpc.testing.integration.TestServiceGrpc.AsyncService;
import java.net.SocketAddress;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -61,8 +68,8 @@
* sent in response streams.
*/
public class TestServiceImpl implements io.grpc.BindableService, AsyncService {
static Context.Key<SocketAddress> PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address");
private final Random random = new Random();

private final ScheduledExecutorService executor;
private final ByteString compressableBuffer;
private final MetricRecorder metricRecorder;
Expand Down Expand Up @@ -235,9 +242,27 @@ public void onNext(StreamingOutputCallRequest request) {
.asRuntimeException());
return;
}
if (whetherSendClientSocketAddressInResponse(request)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we just ignore most of the request? It looks like this should go through toChunkQueue() to actually send the response.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other parts of the request are about chunking behavior which doesn't apply for this test. I don't need to use toChunkQueue which is used to keep track of what response sizes to send in successive chunks, nor dispatcher.enqueue which handles scheduled sending of the chunks.

responseObserver.onNext(
StreamingOutputCallResponse.newBuilder()
.setPeerSocketAddress(PEER_ADDRESS_CONTEXT_KEY.get().toString())
.build());
return;
}
dispatcher.enqueue(toChunkQueue(request));
}

private boolean whetherSendClientSocketAddressInResponse(StreamingOutputCallRequest request) {
Iterator<ResponseParameters> responseParametersIterator =
request.getResponseParametersList().iterator();
while (responseParametersIterator.hasNext()) {
if (responseParametersIterator.next().getFillPeerSocketAddress().getValue()) {
return true;
}
}
return false;
}

@Override
public void onCompleted() {
if (oobTestLocked) {
Expand Down Expand Up @@ -507,7 +532,8 @@ public static List<ServerInterceptor> interceptors() {
return Arrays.asList(
echoRequestHeadersInterceptor(Util.METADATA_KEY),
echoRequestMetadataInHeaders(Util.ECHO_INITIAL_METADATA_KEY),
echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY));
echoRequestMetadataInTrailers(Util.ECHO_TRAILING_METADATA_KEY),
new McsScalingTestcaseInterceptor());
}

/**
Expand Down Expand Up @@ -539,6 +565,22 @@ public void close(Status status, Metadata trailers) {
};
}

static class McsScalingTestcaseInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
SocketAddress peerAddress = call.getAttributes().get(TRANSPORT_ATTR_REMOTE_ADDR);

// Create a new context with the peer address value
Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress);
try {
return Contexts.interceptCall(newContext, call, headers, next);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}

/**
* Echoes request headers with the specified key(s) from a client into response headers only.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void run() {
private int port = 8080;
private boolean useTls = true;
private boolean useAlts = false;
private int mcsLimit = -1;

private ScheduledExecutorService executor;
private Server server;
Expand Down Expand Up @@ -118,6 +119,10 @@ void parseArgs(String[] args) {
usage = true;
break;
}
} else if ("max_concurrent_streams_limit".equals(key)) {
mcsLimit = Integer.parseInt(value);
// TODO: Make Netty server builder usable for IPV6 as well (not limited to MCS handling)
addressType = Util.AddressType.IPV4; // To use NettyServerBuilder
} else {
System.err.println("Unknown argument: " + key);
usage = true;
Expand All @@ -141,6 +146,8 @@ void parseArgs(String[] args) {
+ "\n for testing. Only effective when --use_alts=true."
+ "\n --address_type=IPV4|IPV6|IPV4_IPV6"
+ "\n What type of addresses to listen on. Default IPV4_IPV6"
+ "\n --max_concurrent_streams_limit=LIMIT"
+ "\n Set the maximum concurrent streams limit"
);
System.exit(1);
}
Expand All @@ -149,6 +156,8 @@ void parseArgs(String[] args) {
@SuppressWarnings("AddressSelection")
@VisibleForTesting
void start() throws Exception {
System.out.println("TestServiceServer.start called with addressType " + addressType);
System.out.flush();
executor = Executors.newSingleThreadScheduledExecutor();
ServerCredentials serverCreds;
if (useAlts) {
Expand Down Expand Up @@ -186,6 +195,9 @@ void start() throws Exception {
if (v4Address != null && !v4Address.equals(localV4Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
if (mcsLimit != -1) {
((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(mcsLimit);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
Expand All @@ -201,6 +213,8 @@ void start() throws Exception {
default:
throw new AssertionError("Unknown address type: " + addressType);
}
System.out.println("TestServiceServer.start calling serverBuilder.start.");
System.out.flush();
server = serverBuilder
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.addService(
Expand All @@ -210,6 +224,8 @@ void start() throws Exception {
.intercept(OrcaMetricReportingServerInterceptor.create(metricRecorder))
.build()
.start();
System.out.println("TestServiceServer.start After calling serverBuilder.start.");
System.out.flush();
}

@VisibleForTesting
Expand Down
Loading
Loading