diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index e2b18537c7ac..4873fa695be4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -48,6 +48,7 @@ public class RxDocumentServiceRequest implements Cloneable { private final boolean isNameBased; private final OperationType operationType; private String resourceAddress; + private volatile String cachedCollectionName; public volatile boolean forceNameCacheRefresh; private volatile URI endpointOverride = null; private final UUID activityId; @@ -102,6 +103,22 @@ public boolean isReadOnlyRequest() { public void setResourceAddress(String newAddress) { this.resourceAddress = newAddress; + this.cachedCollectionName = null; + } + + /** + * Gets the collection name extracted from the resource address. + * The result is cached to avoid repeated O(n) slash-scanning in Utils.getCollectionName(). + * + * @return the collection name path segment + */ + public String getCollectionName() { + String result = this.cachedCollectionName; + if (result == null) { + result = Utils.getCollectionName(this.resourceAddress); + this.cachedCollectionName = result; + } + return result; } public boolean isReadOnlyScript() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 42172026ad5b..9292af1b3b6d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -194,14 +194,14 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) { } @Override - public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception { + public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, String requestUriString, int port) throws Exception { HttpMethod method = getHttpMethod(request); HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders()); Flux contentAsByteArray = request.getContentAsByteArrayFlux(); return new HttpRequest(method, - requestUri, - requestUri.getPort(), + requestUriString, + port, httpHeaders, contentAsByteArray); } @@ -279,8 +279,8 @@ public Mono performRequest(RxDocumentServiceRequest r request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics(); } - URI uri = getUri(request); - request.requestContext.resourcePhysicalAddress = uri.toString(); + String uri = getUri(request); + request.requestContext.resourcePhysicalAddress = uri; if (this.throughputControlStore != null) { return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri))); @@ -303,7 +303,7 @@ protected boolean partitionKeyRangeResolutionNeeded(RxDocumentServiceRequest req * @param requestUri * @return Flux */ - public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { + public Mono performRequestInternal(RxDocumentServiceRequest request, String requestUri) { if (!partitionKeyRangeResolutionNeeded(request)) { return this.performRequestInternalCore(request, requestUri); } @@ -316,12 +316,16 @@ public Mono performRequestInternal(RxDocumentServiceR }); } - private Mono performRequestInternalCore(RxDocumentServiceRequest request, URI requestUri) { + // Overload for callers that still pass URI + public Mono performRequestInternal(RxDocumentServiceRequest request, URI requestUri) { + return performRequestInternal(request, requestUri.toString()); + } + + private Mono performRequestInternalCore(RxDocumentServiceRequest request, String requestUri) { try { - HttpRequest httpRequest = request - .getEffectiveHttpTransportSerializer(this) - .wrapInHttpRequest(request, requestUri); + HttpTransportSerializer effectiveSerializer = request.getEffectiveHttpTransportSerializer(this); + HttpRequest httpRequest = effectiveSerializer.wrapInHttpRequest(request, requestUri, this.resolvePort(request)); // Capture the request record early so it's available on both success and error paths. // Each retry creates a new HttpRequest with a new record, so this is per-attempt. @@ -378,7 +382,7 @@ public URI getRootUri(RxDocumentServiceRequest request) { return this.globalEndpointManager.resolveServiceEndpoint(request).getGatewayRegionalEndpoint(); } - private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException { + private String getUri(RxDocumentServiceRequest request) throws URISyntaxException { URI rootUri = request.getEndpointOverride(); if (rootUri == null) { if (request.getIsMedia()) { @@ -402,8 +406,20 @@ private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException { rootUri.getHost(), rootUri.getPort(), ensureSlashPrefixed(path), - null, // Query string not used. - null); + null, + null).toASCIIString(); + } + + private int resolvePort(RxDocumentServiceRequest request) { + URI rootUri = request.getEndpointOverride(); + if (rootUri == null) { + if (request.getIsMedia()) { + rootUri = this.globalEndpointManager.getWriteEndpoints().get(0).getGatewayRegionalEndpoint(); + } else { + rootUri = getRootUri(request); + } + } + return rootUri.getPort(); } private String ensureSlashPrefixed(String path) { @@ -503,7 +519,7 @@ private Mono toDocumentServiceResponse(Mono toDocumentServiceResponse(Mono toDocumentServiceResponse(Mono body; @@ -31,6 +32,7 @@ public class HttpRequest { public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders httpHeaders) { this.httpMethod = httpMethod; this.uri = uri; + this.uriString = uri.toString(); this.port = port; this.headers = httpHeaders; this.reactorNettyRequestRecord = createReactorNettyRequestRecord(); @@ -44,7 +46,8 @@ public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders httpHea */ public HttpRequest(HttpMethod httpMethod, String uri, int port) throws URISyntaxException { this.httpMethod = httpMethod; - this.uri = new URI(uri); + this.uriString = uri; + this.uri = null; this.port = port; this.headers = new HttpHeaders(); this.reactorNettyRequestRecord = createReactorNettyRequestRecord(); @@ -61,6 +64,26 @@ public HttpRequest(HttpMethod httpMethod, String uri, int port) throws URISyntax public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders headers, Flux body) { this.httpMethod = httpMethod; this.uri = uri; + this.uriString = uri.toString(); + this.port = port; + this.headers = headers; + this.body = body; + this.reactorNettyRequestRecord = createReactorNettyRequestRecord(); + } + + /** + * Create a new HttpRequest instance from a URI string without parsing it. + * + * @param httpMethod the HTTP request method + * @param uriString the target address as a string (URI parsing is deferred) + * @param port the target port + * @param headers the HTTP headers to use with this request + * @param body the request content + */ + public HttpRequest(HttpMethod httpMethod, String uriString, int port, HttpHeaders headers, Flux body) { + this.httpMethod = httpMethod; + this.uriString = uriString; + this.uri = null; this.port = port; this.headers = headers; this.body = body; @@ -113,7 +136,25 @@ public HttpRequest withPort(int port) { * @return the target address */ public URI uri() { - return uri; + URI result = this.uri; + if (result == null) { + try { + result = new URI(this.uriString); + this.uri = result; + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid URI: " + this.uriString, e); + } + } + return result; + } + + /** + * Get the target address as a string without triggering URI parsing. + * + * @return the target address string + */ + public String uriString() { + return this.uriString; } /** @@ -124,6 +165,7 @@ public URI uri() { */ public HttpRequest withUri(URI uri) { this.uri = uri; + this.uriString = uri.toString(); return this; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java index 58a2dc95cf8d..f41b37fd8a9a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/HttpTransportSerializer.java @@ -4,10 +4,8 @@ import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import io.netty.buffer.ByteBuf; -import java.net.URI; - public interface HttpTransportSerializer { - HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception; + HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, String requestUri, int port) throws Exception; StoreResponse unwrapToStoreResponse( String endpoint, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java index 3e7f763caeb8..a720fe803b7e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/http/ReactorNettyClient.java @@ -176,7 +176,7 @@ public Mono send(HttpRequest request) { @Override public Mono send(final HttpRequest request, Duration responseTimeout) { Objects.requireNonNull(request.httpMethod()); - Objects.requireNonNull(request.uri()); + Objects.requireNonNull(request.uriString()); Objects.requireNonNull(this.httpClientConfig); if(request.reactorNettyRequestRecord() == null) { ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord(); @@ -202,7 +202,7 @@ public Mono send(final HttpRequest request, Duration responseTimeo .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs) .responseTimeout(responseTimeout) .request(HttpMethod.valueOf(request.httpMethod().toString())) - .uri(request.uri().toASCIIString()) + .uri(request.uriString()) .send(bodySendDelegate(request)) .responseConnection((reactorNettyResponse, reactorNettyConnection) -> { HttpResponse httpResponse = new ReactorNettyHttpResponse(reactorNettyResponse,