Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class RxDocumentServiceRequest implements Cloneable {
private final boolean isNameBased;
private final OperationType operationType;
private String resourceAddress;
private volatile String cachedCollectionName;
public volatile boolean forceNameCacheRefresh;
private volatile URI endpointOverride = null;
private final UUID activityId;
Expand Down Expand Up @@ -102,6 +103,22 @@ public boolean isReadOnlyRequest() {

public void setResourceAddress(String newAddress) {
this.resourceAddress = newAddress;
this.cachedCollectionName = null;
}

/**
* Gets the collection name extracted from the resource address.
* The result is cached to avoid repeated O(n) slash-scanning in Utils.getCollectionName().
*
* @return the collection name path segment
*/
public String getCollectionName() {
String result = this.cachedCollectionName;
if (result == null) {
result = Utils.getCollectionName(this.resourceAddress);
this.cachedCollectionName = result;
}
return result;
}

public boolean isReadOnlyScript() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ public void setCollectionCache(RxClientCollectionCache collectionCache) {
}

@Override
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception {
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, String requestUriString, int port) throws Exception {
HttpMethod method = getHttpMethod(request);
HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());

Flux<byte[]> contentAsByteArray = request.getContentAsByteArrayFlux();
return new HttpRequest(method,
requestUri,
requestUri.getPort(),
requestUriString,
port,
httpHeaders,
contentAsByteArray);
}
Expand Down Expand Up @@ -279,8 +279,8 @@ public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
request.requestContext.cosmosDiagnostics = clientContext.createDiagnostics();
}

URI uri = getUri(request);
request.requestContext.resourcePhysicalAddress = uri.toString();
String uri = getUri(request);
request.requestContext.resourcePhysicalAddress = uri;

if (this.throughputControlStore != null) {
return this.throughputControlStore.processRequest(request, Mono.defer(() -> this.performRequestInternal(request, uri)));
Expand All @@ -303,7 +303,7 @@ protected boolean partitionKeyRangeResolutionNeeded(RxDocumentServiceRequest req
* @param requestUri
* @return Flux<RxDocumentServiceResponse>
*/
public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceRequest request, URI requestUri) {
public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceRequest request, String requestUri) {
if (!partitionKeyRangeResolutionNeeded(request)) {
return this.performRequestInternalCore(request, requestUri);
}
Expand All @@ -316,12 +316,16 @@ public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceR
});
}

private Mono<RxDocumentServiceResponse> performRequestInternalCore(RxDocumentServiceRequest request, URI requestUri) {
// Overload for callers that still pass URI
public Mono<RxDocumentServiceResponse> performRequestInternal(RxDocumentServiceRequest request, URI requestUri) {
return performRequestInternal(request, requestUri.toString());
}

private Mono<RxDocumentServiceResponse> performRequestInternalCore(RxDocumentServiceRequest request, String requestUri) {

try {
HttpRequest httpRequest = request
.getEffectiveHttpTransportSerializer(this)
.wrapInHttpRequest(request, requestUri);
HttpTransportSerializer effectiveSerializer = request.getEffectiveHttpTransportSerializer(this);
HttpRequest httpRequest = effectiveSerializer.wrapInHttpRequest(request, requestUri, this.resolvePort(request));

// Capture the request record early so it's available on both success and error paths.
// Each retry creates a new HttpRequest with a new record, so this is per-attempt.
Expand Down Expand Up @@ -378,7 +382,7 @@ public URI getRootUri(RxDocumentServiceRequest request) {
return this.globalEndpointManager.resolveServiceEndpoint(request).getGatewayRegionalEndpoint();
}

private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException {
private String getUri(RxDocumentServiceRequest request) throws URISyntaxException {
URI rootUri = request.getEndpointOverride();
if (rootUri == null) {
if (request.getIsMedia()) {
Expand All @@ -402,8 +406,20 @@ private URI getUri(RxDocumentServiceRequest request) throws URISyntaxException {
rootUri.getHost(),
rootUri.getPort(),
ensureSlashPrefixed(path),
null, // Query string not used.
null);
null,
null).toASCIIString();
}

private int resolvePort(RxDocumentServiceRequest request) {
URI rootUri = request.getEndpointOverride();
if (rootUri == null) {
if (request.getIsMedia()) {
rootUri = this.globalEndpointManager.getWriteEndpoints().get(0).getGatewayRegionalEndpoint();
} else {
rootUri = getRootUri(request);
}
}
return rootUri.getPort();
}

private String ensureSlashPrefixed(String path) {
Expand Down Expand Up @@ -503,7 +519,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
}
StoreResponse rsp = request
.getEffectiveHttpTransportSerializer(this)
.unwrapToStoreResponse(httpRequest.uri().toString(), request, httpResponseStatus, httpResponseHeaders, content);
.unwrapToStoreResponse(httpRequest.uriString(), request, httpResponseStatus, httpResponseHeaders, content);

// Only clear retainedBufRef AFTER StoreResponse successfully takes ownership.
// If unwrapToStoreResponse throws, retainedBufRef remains set so doFinally
Expand Down Expand Up @@ -619,7 +635,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
ImplementationBridgeHelpers
.CosmosExceptionHelper
.getCosmosExceptionAccessor()
.setRequestUri(dce, Uri.create(httpRequest.uri().toString()));
.setRequestUri(dce, Uri.create(httpRequest.uriString()));

if (request.requestContext.cosmosDiagnostics != null) {
if (httpRequest.reactorNettyRequestRecord() != null) {
Expand Down Expand Up @@ -671,7 +687,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
ImplementationBridgeHelpers
.CosmosExceptionHelper
.getCosmosExceptionAccessor()
.setRequestUri(oce, Uri.create(httpRequest.uri().toString()));
.setRequestUri(oce, Uri.create(httpRequest.uriString()));

if (request.requestContext.getCrossRegionAvailabilityContext() != null) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ protected boolean partitionKeyRangeResolutionNeeded(RxDocumentServiceRequest req
&& request.getPartitionKeyRangeIdentity() != null;
}
@Override
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception {
public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, String requestUri, int port) throws Exception {
if (this.globalDatabaseAccountName == null) {
this.globalDatabaseAccountName = this.globalEndpointManager.getLatestDatabaseAccount().getId();
}
Expand Down Expand Up @@ -233,7 +233,7 @@ public HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI reque
return new HttpRequest(
HttpMethod.POST,
requestUri,
requestUri.getPort(),
port,
headers,
Flux.just(contentAsByteArray))
.withThinClientRequest(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public class HttpRequest {
private HttpMethod httpMethod;
private URI uri;
private String uriString;
private int port;
private HttpHeaders headers;
private Flux<byte[]> body;
Expand All @@ -31,6 +32,7 @@ public class HttpRequest {
public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders httpHeaders) {
this.httpMethod = httpMethod;
this.uri = uri;
this.uriString = uri.toString();
this.port = port;
this.headers = httpHeaders;
this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
Expand All @@ -44,7 +46,8 @@ public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders httpHea
*/
public HttpRequest(HttpMethod httpMethod, String uri, int port) throws URISyntaxException {
this.httpMethod = httpMethod;
this.uri = new URI(uri);
this.uriString = uri;
this.uri = null;
this.port = port;
this.headers = new HttpHeaders();
this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
Expand All @@ -61,6 +64,26 @@ public HttpRequest(HttpMethod httpMethod, String uri, int port) throws URISyntax
public HttpRequest(HttpMethod httpMethod, URI uri, int port, HttpHeaders headers, Flux<byte[]> body) {
this.httpMethod = httpMethod;
this.uri = uri;
this.uriString = uri.toString();
this.port = port;
this.headers = headers;
this.body = body;
this.reactorNettyRequestRecord = createReactorNettyRequestRecord();
}

/**
* Create a new HttpRequest instance from a URI string without parsing it.
*
* @param httpMethod the HTTP request method
* @param uriString the target address as a string (URI parsing is deferred)
* @param port the target port
* @param headers the HTTP headers to use with this request
* @param body the request content
*/
public HttpRequest(HttpMethod httpMethod, String uriString, int port, HttpHeaders headers, Flux<byte[]> body) {
this.httpMethod = httpMethod;
this.uriString = uriString;
this.uri = null;
this.port = port;
this.headers = headers;
this.body = body;
Expand Down Expand Up @@ -113,7 +136,25 @@ public HttpRequest withPort(int port) {
* @return the target address
*/
public URI uri() {
return uri;
URI result = this.uri;
if (result == null) {
try {
result = new URI(this.uriString);
this.uri = result;
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid URI: " + this.uriString, e);
}
}
return result;
}

/**
* Get the target address as a string without triggering URI parsing.
*
* @return the target address string
*/
public String uriString() {
return this.uriString;
}

/**
Expand All @@ -124,6 +165,7 @@ public URI uri() {
*/
public HttpRequest withUri(URI uri) {
this.uri = uri;
this.uriString = uri.toString();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import com.azure.cosmos.implementation.directconnectivity.StoreResponse;
import io.netty.buffer.ByteBuf;

import java.net.URI;

public interface HttpTransportSerializer {
HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, URI requestUri) throws Exception;
HttpRequest wrapInHttpRequest(RxDocumentServiceRequest request, String requestUri, int port) throws Exception;

StoreResponse unwrapToStoreResponse(
String endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Mono<HttpResponse> send(HttpRequest request) {
@Override
public Mono<HttpResponse> send(final HttpRequest request, Duration responseTimeout) {
Objects.requireNonNull(request.httpMethod());
Objects.requireNonNull(request.uri());
Objects.requireNonNull(request.uriString());
Objects.requireNonNull(this.httpClientConfig);
if(request.reactorNettyRequestRecord() == null) {
ReactorNettyRequestRecord reactorNettyRequestRecord = new ReactorNettyRequestRecord();
Expand All @@ -202,7 +202,7 @@ public Mono<HttpResponse> send(final HttpRequest request, Duration responseTimeo
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutMs)
.responseTimeout(responseTimeout)
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toASCIIString())
.uri(request.uriString())
.send(bodySendDelegate(request))
.responseConnection((reactorNettyResponse, reactorNettyConnection) -> {
HttpResponse httpResponse = new ReactorNettyHttpResponse(reactorNettyResponse,
Expand Down
Loading