From 46125f47b1d173803b903da29feac80afee46c54 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Sat, 14 Mar 2026 19:27:46 -0400 Subject: [PATCH 1/6] feat(cosmos): Write availability strategy (hedging) for PPAF single-writer accounts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable proactive write hedging for Per-Partition Automatic Failover (PPAF) on single-writer Cosmos DB accounts. When a write to the primary region is slow or failing, the SDK now hedges the write to a read region — reducing time-to-recovery from 60-120s (retry-based) to the hedging threshold (~1s with default config). ## Problem In PPAF-enabled single-writer accounts, when a partition fails over, the SDK waits for error signals (503, 408, 410) which can take 60-120s before marking a region as failed for that partition via the retry-based path in GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover. ## Solution Plug the existing availability strategy (hedging) machinery into the write path for PPAF: 1. **Speculation gating** (RxDocumentClientImpl.getApplicableRegionsForSpeculation): - Relax the canUseMultipleWriteLocations() gate for PPAF single-writer accounts - Relax the isIdempotentWriteRetriesEnabled gate (PPAF provides partition-level consistency) - Use ALL account-level read regions (getAvailableReadRoutingContexts) as hedge candidates, not just preferred regions — PPAF failover can target any read region 2. **Routing** (tryAddPartitionLevelLocationOverride + CrossRegionAvailabilityContext): - Add ppafWriteHedgeTargetRegion field to CrossRegionAvailabilityContextForRxDocumentServiceRequest - In tryAddPartitionLevelLocationOverride: when ppafWriteHedgeTargetRegion is set, create the conchashmap entry via computeIfAbsent and route via hedgeFailoverInfo.getCurrent() - This is synchronous and deterministic — conchashmap updated in the same request pipeline - Thread safety: uses getCurrent() from the computeIfAbsent result (not raw hedgeTarget) to avoid routing to a region the concurrent retry path may have marked as failed 3. **Default E2E policy** (evaluatePpafEnforcedE2eLatencyPolicyCfgForWrites): - Mirrors the read defaults exactly — symmetric hedging behavior for reads and writes - Only applied to point write operations (batch excluded via isPointOperation gate) - DIRECT: timeout=networkRequestTimeout+1s, threshold=min(timeout/2, 1s), step=500ms - GATEWAY: timeout=min(6s, httpTimeout), threshold=min(timeout/2, 1s), step=500ms 4. **Safety lever** (Configs.isWriteAvailabilityStrategyEnabledWithPpaf): - System property COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF (default: true) - Allows opt-out without code changes if regression is observed ## Files changed (6) - Configs.java: Write availability strategy PPAF config flag - RxDocumentClientImpl.java: Speculation gating, region resolution, write E2E policy - CrossRegionAvailabilityContextForRxDocumentServiceRequest.java: ppafWriteHedgeTargetRegion field - ClientRetryPolicy.java: Honor ppafWriteHedgeTargetRegion in tryAddPartitionLevelLocationOverride - GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java: Hedge target handling in tryAddPartitionLevelLocationOverride with computeIfAbsent + getCurrent() - PerPartitionAutomaticFailoverE2ETests.java: 26 new test cases ## Test coverage | Op | DIRECT (mocked transport) | GATEWAY (mocked HttpClient) | |---------|--------------------------|----------------------------| | Create | 410/21005 + 503/21008 | delayed write region | | Replace | 410/21005 | delayed write region | | Upsert | 410/21005 | delayed write region | | Delete | 410/21005 | delayed write region | | Patch | 410/21005 | delayed write region | Additional tests: - Opt-out via COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF=false - Batch bypass verification (batch uses retry-based PPAF, not hedging) - Explicit conchashmap verification: after hedge success, asserts the PPAF manager's partitionKeyRangeToFailoverInfo entry points to a region != the failed write region All assertions are exact match: 2 regions before failover, 1 region after failover. 165 tests total (existing + new), 0 regressions, 0 modifications to existing test logic. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...PerPartitionAutomaticFailoverE2ETests.java | 661 +++++++++++++++++- .../implementation/ClientRetryPolicy.java | 5 +- .../azure/cosmos/implementation/Configs.java | 14 + ...ityContextForRxDocumentServiceRequest.java | 18 + .../implementation/RxDocumentClientImpl.java | 151 +++- ...nagerForPerPartitionAutomaticFailover.java | 33 + 6 files changed, 873 insertions(+), 9 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java index 92d21daf488b..803f0046da70 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java @@ -36,6 +36,9 @@ import com.azure.cosmos.implementation.http.HttpHeaders; import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover; +import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PartitionLevelAutomaticFailoverInfo; +import com.azure.cosmos.implementation.PartitionKeyRangeWrapper; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import com.azure.cosmos.models.CosmosBatch; @@ -229,7 +232,7 @@ public class PerPartitionAutomaticFailoverE2ETests extends TestSuiteBase { private static final CosmosEndToEndOperationLatencyPolicyConfig THREE_SEC_E2E_TIMEOUT_POLICY = new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(3)).build(); - BiConsumer, ExpectedResponseCharacteristics> validateExpectedResponseCharacteristics = (responseWrapper, expectedResponseCharacteristics) -> { + BiConsumer, ExpectedResponseCharacteristics> validateExpectedResponseCharacteristics= (responseWrapper, expectedResponseCharacteristics) -> { assertThat(responseWrapper).isNotNull(); Utils.ValueHolder cosmosDiagnosticsValueHolder = new Utils.ValueHolder<>(); @@ -2068,9 +2071,663 @@ public void testFailoverBehaviorForNonWriteOperationsWithPpafDynamicEnablement( } } + // region: Write Availability Strategy for PPAF tests + + /** + * Data provider for write availability strategy with PPAF scenarios. + * + *

Covers: Create, Replace, Upsert, Delete, Patch — all point write operations that go + * through {@code wrapPointOperationWithAvailabilityStrategy}. Batch is intentionally excluded + * because it bypasses the availability strategy path. + * + *

Fault: GONE/SERVER_GENERATED_410 (sub-status 21005) injected in the write region. + * This simulates a partition-level failure in DIRECT mode. The hedged write should succeed + * at a read region via the availability strategy. + */ + @DataProvider(name = "ppafWriteAvailabilityStrategyConfigs") + public Object[][] ppafWriteAvailabilityStrategyConfigs() { + + // Before failover: write goes to write region (fails) + hedge to read region (succeeds). + // Exactly 2 regions contacted, success. + ExpectedResponseCharacteristics expectedResponseBeforeFailover = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + // After failover: PPAF conchashmap routes directly to failover region. + // Write goes directly there — 1 region contacted, 0 retries, success. + ExpectedResponseCharacteristics expectedResponseAfterFailover = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setExpectedMaxRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(1); + + return new Object[][]{ + { + "Test write availability strategy hedging for CREATE with GONE / SERVER_GENERATED_410 in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, + HttpConstants.StatusCodes.CREATED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for REPLACE with GONE / SERVER_GENERATED_410 in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for UPSERT with GONE / SERVER_GENERATED_410 in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for DELETE with GONE / SERVER_GENERATED_410 in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for PATCH with GONE / SERVER_GENERATED_410 in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for CREATE with SERVICE_UNAVAILABLE / SERVER_GENERATED_503 in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.CREATED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + }; + } + + /** + * Validates that the write availability strategy (hedging) works correctly for PPAF-enabled + * single-writer accounts in DIRECT mode. + * + *

Scenario: + *

    + *
  1. A fault (GONE/410 with sub-status 21005 or SERVICE_UNAVAILABLE/503) is injected into the + * write region for a specific partition key range.
  2. + *
  3. The availability strategy should hedge the write to a read region, which returns success.
  4. + *
  5. After the hedged write succeeds, the PPAF manager should record the successful read region + * as the new write target for that partition.
  6. + *
  7. Subsequent writes should route directly to the new region (1 region contacted, 0 retries).
  8. + *
+ * + *

Design rationale: This test validates the fast-path failover via availability + * strategy, which complements the slower retry-based failover (60-120s). By hedging writes to read + * regions, we reduce the time-to-recovery for partition-level failures from minutes to the hedging + * threshold (typically a few seconds). + * + *

Why DIRECT mode only: In DIRECT mode, we can precisely mock the transport + * client to return errors for a specific (region, partition) combination while returning success + * for all other combinations. This level of control is required to validate the hedging behavior. + */ + @Test(groups = {"multi-region"}, dataProvider = "ppafWriteAvailabilityStrategyConfigs") + public void testPpafWriteAvailabilityStrategyHedgingInDirectMode( + String testType, + OperationType operationType, + int errorStatusCodeToMock, + int errorSubStatusCodeToMock, + int successStatusCode, + ExpectedResponseCharacteristics expectedBeforeFailover, + ExpectedResponseCharacteristics expectedAfterFailover) { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (connectionMode != ConnectionMode.DIRECT) { + throw new SkipException(String.format("Test with type: %s not eligible for connection mode %s.", testType, connectionMode)); + } + + TransportClient transportClientMock = Mockito.mock(TransportClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + + // Reset any client-level E2E policy that a prior test may have set on the shared builder + CosmosAsyncClient asyncClient = getClientBuilder() + .endToEndOperationLatencyPolicyConfig(null) + .buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + StoreClient storeClient = ReflectionUtils.getStoreClient(rxDocumentClient); + ReplicatedResourceClient replicatedResourceClient = ReflectionUtils.getReplicatedResourceClient(storeClient); + ConsistencyReader consistencyReader = ReflectionUtils.getConsistencyReader(replicatedResourceClient); + StoreReader storeReader = ReflectionUtils.getStoreReader(consistencyReader); + ConsistencyWriter consistencyWriter = ReflectionUtils.getConsistencyWriter(replicatedResourceClient); + + Utils.ValueHolder> partitionKeyRangesForContainer + = getPartitionKeyRangesForContainer(asyncContainer, rxDocumentClient).block(); + + assertThat(partitionKeyRangesForContainer).isNotNull(); + assertThat(partitionKeyRangesForContainer.v).isNotNull(); + assertThat(partitionKeyRangesForContainer.v.size()).isGreaterThanOrEqualTo(1); + + PartitionKeyRange partitionKeyRangeWithIssues = partitionKeyRangesForContainer.v.get(0); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(2); + + // The first preferred read region is used as the write region in single-writer accounts + String regionWithIssues = preferredRegions.get(0); + RegionalRoutingContext regionalRoutingContextWithIssues = new RegionalRoutingContext(new URI(readableRegionNameToEndpoint.get(regionWithIssues))); + + ReflectionUtils.setTransportClient(storeReader, transportClientMock); + ReflectionUtils.setTransportClient(consistencyWriter, transportClientMock); + + // Default: all requests succeed + setupTransportClientToReturnSuccessResponse(transportClientMock, constructStoreResponse(operationType, successStatusCode)); + + // Override: fault injected for the write region + specific partition + CosmosException cosmosException = createCosmosException(errorStatusCodeToMock, errorSubStatusCodeToMock); + setupTransportClientToThrowCosmosException( + transportClientMock, + partitionKeyRangeWithIssues, + regionalRoutingContextWithIssues, + cosmosException); + + // Enable PPAF via delegating DatabaseAccountManagerInternal + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + + TestObject testItem = TestObject.create(); + + Function> dataPlaneOperation = resolveDataPlaneOperation(operationType); + + OperationInvocationParamsWrapper operationInvocationParamsWrapper = new OperationInvocationParamsWrapper(); + operationInvocationParamsWrapper.asyncContainer = asyncContainer; + operationInvocationParamsWrapper.createdTestItem = testItem; + // No per-request E2E policy — the PPAF default write E2E policy applies automatically + operationInvocationParamsWrapper.itemRequestOptions = new CosmosItemRequestOptions(); + operationInvocationParamsWrapper.patchItemRequestOptions = new CosmosPatchItemRequestOptions(); + + // Phase 1: Initial write — should hedge to read region and succeed + // The availability strategy should fire the hedged request after the threshold + ResponseWrapper responseDuringHedging = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseDuringHedging, expectedBeforeFailover); + + // Verify PPAF conchashmap: after successful hedge, the partition should be tracked + // with the failover region (read region) as the current target. + GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover ppafManager = + rxDocumentClient.getGlobalPartitionEndpointManagerForPerPartitionAutomaticFailover(); + + @SuppressWarnings("unchecked") + ConcurrentHashMap failoverInfoMap = + (ConcurrentHashMap) + ReflectionUtils.get( + ConcurrentHashMap.class, + ppafManager, + "partitionKeyRangeToFailoverInfo"); + + assertThat(failoverInfoMap).isNotNull(); + assertThat(failoverInfoMap).isNotEmpty(); + // The partition should be tracked with a failover region that is NOT the write region + PartitionLevelAutomaticFailoverInfo failoverInfo = failoverInfoMap.values().iterator().next(); + assertThat(failoverInfo).isNotNull(); + assertThat(failoverInfo.getCurrent()).isNotNull(); + assertThat(failoverInfo.getCurrent()).isNotEqualTo(regionalRoutingContextWithIssues); + + // Phase 2: Post-stabilization — subsequent writes should route directly to the failover region + ResponseWrapper responseAfterStabilization = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(responseAfterStabilization, expectedAfterFailover); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + System.clearProperty("COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); + safeClose(cosmosAsyncClientValueHolder.v); + } + } + + /** + * Validates that write availability strategy hedging for PPAF can be disabled via the + * system property {@code COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF=false}. + * + *

When disabled, write operations should fall back to the slower retry-based failover path + * (contacting 2 regions with retries), matching the pre-feature behavior. + * + *

Why this matters: Provides a safety valve for customers who experience + * regressions from the write availability strategy. By disabling this config, the SDK reverts + * to the original PPAF retry-based failover for writes without requiring a code change. + */ + @Test(groups = {"multi-region"}) + public void testPpafWriteAvailabilityStrategyOptOutViaConfig() { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (connectionMode != ConnectionMode.DIRECT) { + throw new SkipException("Test not eligible for gateway mode."); + } + + TransportClient transportClientMock = Mockito.mock(TransportClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + // Disable write availability strategy for PPAF + System.setProperty("COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF", "false"); + + // Reset any client-level E2E policy that a prior test may have set on the shared builder + CosmosAsyncClient asyncClient = getClientBuilder() + .endToEndOperationLatencyPolicyConfig(null) + .buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + StoreClient storeClient = ReflectionUtils.getStoreClient(rxDocumentClient); + ReplicatedResourceClient replicatedResourceClient = ReflectionUtils.getReplicatedResourceClient(storeClient); + ConsistencyReader consistencyReader = ReflectionUtils.getConsistencyReader(replicatedResourceClient); + StoreReader storeReader = ReflectionUtils.getStoreReader(consistencyReader); + ConsistencyWriter consistencyWriter = ReflectionUtils.getConsistencyWriter(replicatedResourceClient); + + Utils.ValueHolder> partitionKeyRangesForContainer + = getPartitionKeyRangesForContainer(asyncContainer, rxDocumentClient).block(); + + assertThat(partitionKeyRangesForContainer).isNotNull(); + assertThat(partitionKeyRangesForContainer.v).isNotNull(); + + PartitionKeyRange partitionKeyRangeWithIssues = partitionKeyRangesForContainer.v.get(0); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(2); + + String regionWithIssues = preferredRegions.get(0); + RegionalRoutingContext regionalRoutingContextWithIssues = new RegionalRoutingContext(new URI(readableRegionNameToEndpoint.get(regionWithIssues))); + + ReflectionUtils.setTransportClient(storeReader, transportClientMock); + ReflectionUtils.setTransportClient(consistencyWriter, transportClientMock); + + setupTransportClientToReturnSuccessResponse(transportClientMock, constructStoreResponse(OperationType.Create, HttpConstants.StatusCodes.CREATED)); + + CosmosException cosmosException = createCosmosException( + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410); + + setupTransportClientToThrowCosmosException( + transportClientMock, + partitionKeyRangeWithIssues, + regionalRoutingContextWithIssues, + cosmosException); + + // Enable PPAF + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + + TestObject testItem = TestObject.create(); + + Function> dataPlaneOperation = resolveDataPlaneOperation(OperationType.Create); + + OperationInvocationParamsWrapper operationInvocationParamsWrapper = new OperationInvocationParamsWrapper(); + operationInvocationParamsWrapper.asyncContainer = asyncContainer; + operationInvocationParamsWrapper.createdTestItem = testItem; + operationInvocationParamsWrapper.itemRequestOptions = new CosmosItemRequestOptions(); + operationInvocationParamsWrapper.patchItemRequestOptions = new CosmosPatchItemRequestOptions(); + + // With write availability strategy disabled, the write falls back to retry-based PPAF failover. + // Expect: success, 1-2 regions contacted, retries vary. + // With write availability strategy disabled, retry-based PPAF failover kicks in. + // Exactly 2 regions contacted: write region fails, retry routes to read region. + ExpectedResponseCharacteristics expectedWithOptOut = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(1) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + ResponseWrapper response = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(response, expectedWithOptOut); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + System.clearProperty("COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); + safeClose(cosmosAsyncClientValueHolder.v); + } + } + + /** + * Validates that Batch operations are NOT affected by the write availability strategy for PPAF. + * Batch bypasses {@code wrapPointOperationWithAvailabilityStrategy} and uses the batch transport + * path directly. This test ensures that Batch continues to use the retry-based failover path. + */ + @Test(groups = {"multi-region"}) + public void testPpafWriteAvailabilityStrategyDoesNotAffectBatch() { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (connectionMode != ConnectionMode.DIRECT) { + throw new SkipException("Test not eligible for gateway mode."); + } + + TransportClient transportClientMock = Mockito.mock(TransportClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + + // Reset any client-level E2E policy that a prior test may have set on the shared builder + CosmosAsyncClient asyncClient = getClientBuilder() + .endToEndOperationLatencyPolicyConfig(null) + .buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + RxDocumentClientImpl rxDocumentClient = (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + StoreClient storeClient = ReflectionUtils.getStoreClient(rxDocumentClient); + ReplicatedResourceClient replicatedResourceClient = ReflectionUtils.getReplicatedResourceClient(storeClient); + ConsistencyReader consistencyReader = ReflectionUtils.getConsistencyReader(replicatedResourceClient); + StoreReader storeReader = ReflectionUtils.getStoreReader(consistencyReader); + ConsistencyWriter consistencyWriter = ReflectionUtils.getConsistencyWriter(replicatedResourceClient); + + Utils.ValueHolder> partitionKeyRangesForContainer + = getPartitionKeyRangesForContainer(asyncContainer, rxDocumentClient).block(); + + assertThat(partitionKeyRangesForContainer).isNotNull(); + assertThat(partitionKeyRangesForContainer.v).isNotNull(); + + PartitionKeyRange partitionKeyRangeWithIssues = partitionKeyRangesForContainer.v.get(0); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(2); + + String regionWithIssues = preferredRegions.get(0); + RegionalRoutingContext regionalRoutingContextWithIssues = new RegionalRoutingContext(new URI(readableRegionNameToEndpoint.get(regionWithIssues))); + + ReflectionUtils.setTransportClient(storeReader, transportClientMock); + ReflectionUtils.setTransportClient(consistencyWriter, transportClientMock); + + setupTransportClientToReturnSuccessResponse(transportClientMock, constructStoreResponse(OperationType.Batch, HttpConstants.StatusCodes.OK)); + + CosmosException cosmosException = createCosmosException( + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_410); + + setupTransportClientToThrowCosmosException( + transportClientMock, + partitionKeyRangeWithIssues, + regionalRoutingContextWithIssues, + cosmosException); + + // Enable PPAF + GlobalEndpointManager globalEndpointManager = ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccountManagerInternal originalOwner = ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + + // Batch uses retry-based PPAF failover, NOT availability strategy hedging. + // Exactly 2 regions contacted: write region fails, retry routes to read region. + ExpectedResponseCharacteristics expectedForBatch = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(1) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + Function> dataPlaneOperation = resolveDataPlaneOperation(OperationType.Batch); + + TestObject testItem = TestObject.create(); + + OperationInvocationParamsWrapper operationInvocationParamsWrapper = new OperationInvocationParamsWrapper(); + operationInvocationParamsWrapper.asyncContainer = asyncContainer; + operationInvocationParamsWrapper.createdTestItem = testItem; + operationInvocationParamsWrapper.itemRequestOptions = new CosmosItemRequestOptions(); + operationInvocationParamsWrapper.patchItemRequestOptions = new CosmosPatchItemRequestOptions(); + + ResponseWrapper response = dataPlaneOperation.apply(operationInvocationParamsWrapper); + this.validateExpectedResponseCharacteristics.accept(response, expectedForBatch); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + System.clearProperty("COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); + safeClose(cosmosAsyncClientValueHolder.v); + } + } + /** - * Helper: Executes the hedging window (multiple consecutive fault attempts) followed by a single post-window verification. + * Data provider for write availability strategy with PPAF in GATEWAY mode. + * + *

Uses a mocked {@code HttpClient} to simulate a delayed write region response + * while returning success from the read region. This approach ensures deterministic + * behavior for all 5 write operation types (Create, Replace, Upsert, Delete, Patch). */ + @DataProvider(name = "ppafWriteAvailabilityStrategyGatewayConfigs") + public Object[][] ppafWriteAvailabilityStrategyGatewayConfigs() { + + // Before failover: write goes to write region (delayed 10s by mock) → + // availability strategy hedges to read region (mock returns success) → succeeds. + // Before failover: write goes to write region (delayed 10s) + hedge to read region (success). + // Exactly 2 regions contacted. + ExpectedResponseCharacteristics expectedBeforeFailover = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(2); + + // After failover: PPAF conchashmap routes directly to failover region. + // Exactly 1 region contacted. + ExpectedResponseCharacteristics expectedAfterFailover = new ExpectedResponseCharacteristics() + .setExpectedMinRetryCount(0) + .setExpectedMaxRetryCount(0) + .setShouldFinalResponseHaveSuccess(true) + .setExpectedRegionsContactedCount(1); + + return new Object[][]{ + { + "GATEWAY: Write availability strategy hedging for CREATE with delayed write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.CREATED, + expectedBeforeFailover, + expectedAfterFailover, + }, + { + "GATEWAY: Write availability strategy hedging for REPLACE with delayed write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + }, + { + "GATEWAY: Write availability strategy hedging for UPSERT with delayed write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + }, + { + "GATEWAY: Write availability strategy hedging for DELETE with delayed write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedBeforeFailover, + expectedAfterFailover, + }, + { + "GATEWAY: Write availability strategy hedging for PATCH with delayed write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + }, + }; + } + + /** + * Validates write availability strategy hedging in GATEWAY mode for PPAF-enabled single-writer accounts. + * + *

Approach: Uses a mocked {@code HttpClient} (same pattern as existing GATEWAY + * write failover tests) to control responses from both write and read regions: + *

    + *
  • Default: all requests return mocked success.
  • + *
  • Override: requests to the write region endpoint are delayed by 10s before returning an error, + * simulating an unresponsive write region.
  • + *
  • The hedged request to the read region hits the default success mock and completes immediately.
  • + *
+ * + *

This gives deterministic control over both regions for all 5 write operation types. + */ + @Test(groups = {"multi-region"}, dataProvider = "ppafWriteAvailabilityStrategyGatewayConfigs") + public void testPpafWriteAvailabilityStrategyHedgingInGatewayMode( + String testType, + OperationType operationType, + int successStatusCode, + ExpectedResponseCharacteristics expectedBeforeFailover, + ExpectedResponseCharacteristics expectedAfterFailover) { + + ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); + ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); + + if (connectionMode != ConnectionMode.GATEWAY) { + throw new SkipException(String.format("Test with type: %s not eligible for connection mode %s.", testType, connectionMode)); + } + + HttpClient mockedHttpClient = Mockito.mock(HttpClient.class); + List preferredRegions = this.accountLevelLocationReadableLocationContext.serviceOrderedReadableRegions; + Map readableRegionNameToEndpoint = this.accountLevelLocationReadableLocationContext.regionNameToEndpoint; + Utils.ValueHolder cosmosAsyncClientValueHolder = new Utils.ValueHolder<>(); + + try { + // Reset any client-level E2E policy that a prior test may have set on the shared builder + CosmosAsyncClient asyncClient = getClientBuilder() + .endToEndOperationLatencyPolicyConfig(null) + .buildAsyncClient(); + cosmosAsyncClientValueHolder.v = asyncClient; + + CosmosAsyncContainer asyncContainer = asyncClient + .getDatabase(this.sharedDatabase.getId()) + .getContainer(this.sharedSinglePartitionContainer.getId()); + + // Warm caches before mocking + asyncContainer.getFeedRanges().block(); + + RxDocumentClientImpl rxDocumentClient = + (RxDocumentClientImpl) ReflectionUtils.getAsyncDocumentClient(asyncClient); + + RxStoreModel rxStoreModel = ReflectionUtils.getGatewayProxy(rxDocumentClient); + + GlobalEndpointManager globalEndpointManager = + ReflectionUtils.getGlobalEndpointManager(rxDocumentClient); + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + + // Enable PPAF dynamically + DatabaseAccountManagerInternal originalOwner = + ReflectionUtils.getGlobalEndpointManagerOwner(globalEndpointManager); + AtomicReference ppafEnabledRef = new AtomicReference<>(Boolean.TRUE); + DatabaseAccountManagerInternal overridingOwner = + new DelegatingDatabaseAccountManagerInternal(originalOwner, ppafEnabledRef); + ReflectionUtils.setGlobalEndpointManagerOwner(globalEndpointManager, overridingOwner); + + DatabaseAccount latestDatabaseAccountSnapshot = globalEndpointManager.getLatestDatabaseAccount(); + globalEndpointManager.refreshLocationAsync(latestDatabaseAccountSnapshot, true).block(); + + assertThat(preferredRegions).isNotNull(); + assertThat(preferredRegions.size()).isGreaterThanOrEqualTo(2); + + String regionWithIssues = preferredRegions.get(0); + + // Replace the gateway HttpClient with our mock + ReflectionUtils.setGatewayHttpClient(rxStoreModel, mockedHttpClient); + + // Default: all requests return success (including hedged requests to read region) + setupHttpClientToReturnSuccessResponse(mockedHttpClient, operationType, databaseAccount, successStatusCode); + + // Override: write region requests are delayed by 10s then error — simulates unresponsive write region + CosmosException cosmosException = createCosmosException( + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503); + + // shouldForceE2ETimeout=true triggers the Mono.delay(10s) pattern + setupHttpClientToThrowCosmosException( + mockedHttpClient, + new URI(readableRegionNameToEndpoint.get(regionWithIssues)), + cosmosException, + false, // shouldThrowNetworkError + false, // shouldThrowReadTimeoutExceptionWhenNetworkError + true); // shouldForceE2ETimeout — delays response by 10s + + TestObject testItem = TestObject.create(); + + Function> dataPlaneOperation = + resolveDataPlaneOperation(operationType); + + OperationInvocationParamsWrapper params = new OperationInvocationParamsWrapper(); + params.asyncContainer = asyncContainer; + params.createdTestItem = testItem; + params.itemRequestOptions = new CosmosItemRequestOptions(); + params.patchItemRequestOptions = new CosmosPatchItemRequestOptions(); + + // Phase 1: Write to the delayed region should hedge to the read region (mocked success) + ResponseWrapper responseDuringHedging = dataPlaneOperation.apply(params); + this.validateExpectedResponseCharacteristics.accept(responseDuringHedging, expectedBeforeFailover); + + // Phase 2: Post-stabilization — should route directly to the new region + ResponseWrapper responseAfterStabilization = dataPlaneOperation.apply(params); + this.validateExpectedResponseCharacteristics.accept(responseAfterStabilization, expectedAfterFailover); + + } catch (Exception e) { + Assertions.fail("The test ran into an exception {}", e); + } finally { + System.clearProperty("COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"); + safeClose(cosmosAsyncClientValueHolder.v); + } + } + + // endregion private void runHedgingPhasesForNonWrite( int consecutiveFaults, Function> dataPlaneOperation, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index 6211e4725d27..a6acd5d04afa 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -546,8 +546,11 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { request.requestContext.routeToLocation(this.regionalRoutingContext); } - // In case PPAF is enabled and a location override exists for the partition key range assigned to the request + // In case PPAF is enabled and a location override exists for the partition key range assigned to the request. + // This also handles PPAF write hedging — when ppafWriteHedgeTargetRegion is set, it creates + // the conchashmap entry and routes the request to the target read region. this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryAddPartitionLevelLocationOverride(request); + this.throttlingRetry.onBeforeSendRequest(request); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index d8380bd94fff..f7373a3ca45f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -330,6 +330,10 @@ public class Configs { private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; private static final String IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_READ_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; + private static final String DEFAULT_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "true"; + private static final String IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF = "COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; + private static final String IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE = "COSMOS_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF"; + private static final int DEFAULT_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = 25; private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF = "COSMOS.WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF"; private static final String WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE = "COSMOS_WARN_LEVEL_LOGGING_THRESHOLD_FOR_PPAF_VARIABLE"; @@ -1344,6 +1348,16 @@ public static boolean isReadAvailabilityStrategyEnabledWithPpaf() { return Boolean.parseBoolean(isReadAvailabilityStrategyEnabledWithPpaf); } + public static boolean isWriteAvailabilityStrategyEnabledWithPpaf() { + String isWriteAvailabilityStrategyEnabledWithPpaf = System.getProperty( + IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF, + firstNonNull( + emptyToNull(System.getenv().get(IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF_VARIABLE)), + DEFAULT_IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF)); + + return Boolean.parseBoolean(isWriteAvailabilityStrategyEnabledWithPpaf); + } + public static String getAadScopeOverride() { return System.getProperty( AAD_SCOPE_OVERRIDE, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java index b5faa8cb5ded..3226fd13db9b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java @@ -7,6 +7,7 @@ import com.azure.cosmos.implementation.perPartitionAutomaticFailover.PerPartitionAutomaticFailoverInfoHolder; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; import com.azure.cosmos.implementation.perPartitionCircuitBreaker.PerPartitionCircuitBreakerInfoHolder; +import com.azure.cosmos.implementation.routing.RegionalRoutingContext; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -96,4 +97,21 @@ public void setPerPartitionFailoverInfo(PartitionLevelAutomaticFailoverInfo part public PerPartitionAutomaticFailoverInfoHolder getPerPartitionAutomaticFailoverInfoHolder() { return this.perPartitionAutomaticFailoverInfoHolder; } + + /** + * For PPAF write hedging on single-writer accounts, this field holds the target + * read region that the hedged write should be routed to. When set, {@code ClientRetryPolicy} + * uses {@code routeToLocation(RegionalRoutingContext)} to force-route the request + * to this region, bypassing the excluded-regions mechanism which cannot route writes + * to read regions on single-writer accounts. + */ + private volatile RegionalRoutingContext ppafWriteHedgeTargetRegion; + + public RegionalRoutingContext getPpafWriteHedgeTargetRegion() { + return this.ppafWriteHedgeTargetRegion; + } + + public void setPpafWriteHedgeTargetRegion(RegionalRoutingContext ppafWriteHedgeTargetRegion) { + this.ppafWriteHedgeTargetRegion = ppafWriteHedgeTargetRegion; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 191b5a969cd3..8826d50b83b2 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -293,6 +293,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization private List operationPolicies; private final AtomicReference cachedCosmosAsyncClientSnapshot; private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads; + private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForWrites; private Consumer perPartitionFailoverConfigModifier; public RxDocumentClientImpl(URI serviceEndpoint, @@ -3483,6 +3484,14 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation return this.ppafEnforcedE2ELatencyPolicyConfigForReads; } + // For write point operations, apply PPAF-enforced write availability strategy — mirroring + // the read default. This enables hedging writes to read regions when the primary write + // region is unresponsive. Batch is excluded (not a point operation) because it bypasses + // wrapPointOperationWithAvailabilityStrategy. + if (operationType.isWriteOperation() && operationType.isPointOperation()) { + return this.ppafEnforcedE2ELatencyPolicyConfigForWrites; + } + return null; } @@ -7335,6 +7344,29 @@ private Mono> wrapPointOperationWithAvailabilityStrat idempotentWriteRetriesEnabled, nonNullRequestOptions); + // For PPAF write hedging on single-writer accounts, build a map of region name → RegionalRoutingContext + // so hedged requests can be force-routed to specific read regions via routeToLocation. + // This bypasses the excluded-regions mechanism which cannot route writes to read regions. + boolean isPpafWriteHedging = operationType.isWriteOperation() + && !this.globalEndpointManager.canUseMultipleWriteLocations() + && this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled() + && Configs.isWriteAvailabilityStrategyEnabledWithPpaf(); + + Map regionToRoutingContext = new HashMap<>(); + if (isPpafWriteHedging) { + // Use ALL account-level read regions (not just preferred regions) as hedge candidates. + // PPAF write failover can target any read region, not just the ones in the preferred list. + List readRoutingContexts = + this.globalEndpointManager.getAvailableReadRoutingContexts(); + for (RegionalRoutingContext rrc : readRoutingContexts) { + String regionName = this.globalEndpointManager.getRegionName( + rrc.getGatewayRegionalEndpoint(), OperationType.Read); + if (regionName != null) { + regionToRoutingContext.put(regionName.toLowerCase(Locale.ROOT), rrc); + } + } + } + AtomicBoolean isOperationSuccessful = new AtomicBoolean(false); AtomicBoolean shouldAddHubRegionProcessingOnlyHeader = new AtomicBoolean(false); PerPartitionCircuitBreakerInfoHolder perPartitionCircuitBreakerInfoHolder = new PerPartitionCircuitBreakerInfoHolder(); @@ -7448,6 +7480,15 @@ private Mono> wrapPointOperationWithAvailabilityStrat perPartitionCircuitBreakerInfoHolder, perPartitionAutomaticFailoverInfoHolder); + // For PPAF write hedging, set the target read region so ClientRetryPolicy + // routes the hedged write there via routeToLocation instead of excluded-regions. + if (isPpafWriteHedging) { + RegionalRoutingContext targetRegion = regionToRoutingContext.get(region.toLowerCase(Locale.ROOT)); + if (targetRegion != null) { + crossRegionAvailabilityContextForHedgedRequest.setPpafWriteHedgeTargetRegion(targetRegion); + } + } + Mono regionalCrossRegionRetryMono = callback.apply(clonedOptions, endToEndPolicyConfig, diagnosticsFactory, crossRegionAvailabilityContextForHedgedRequest) .map(NonTransientPointOperationResult::new) @@ -7652,6 +7693,55 @@ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatenc return null; } + /** + * Evaluates whether a PPAF-enforced E2E latency policy should be applied for write operations. + * Uses the same timeout/threshold values as the read policy — the availability strategy + * hedging behavior should be symmetric for reads and writes under PPAF. + */ + private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForWrites( + GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover, + ConnectionPolicy connectionPolicy) { + + if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) { + return null; + } + + if (Configs.isWriteAvailabilityStrategyEnabledWithPpaf()) { + + logger.info("ATTN: As Per-Partition Automatic Failover (PPAF) is enabled a default End-to-End Operation Latency Policy will be applied for write operation types."); + + if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) { + Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout(); + + checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); + + Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); + Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); + + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) + .build(); + } else { + + Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); + + checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); + + Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout); + + Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND); + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); + + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) + .build(); + } + } + + return null; + } + private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) { if (clientContextOverride != null) { return clientContextOverride; @@ -7723,10 +7813,36 @@ private List getApplicableRegionsForSpeculation( } if (operationType.isWriteOperation() && !isIdempotentWriteRetriesEnabled) { - return EMPTY_REGION_LIST; - } - - if (operationType.isWriteOperation() && !this.globalEndpointManager.canUseMultipleWriteLocations()) { + // For PPAF-enabled single-writer accounts, write hedging is allowed even without + // explicit idempotent write retries because PPAF provides partition-level write + // consistency guarantees — the service ensures exactly-once semantics for writes + // to failed-over partitions. + boolean isPpafEnabled = + this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled() + && Configs.isWriteAvailabilityStrategyEnabledWithPpaf(); + + if (!isPpafEnabled) { + return EMPTY_REGION_LIST; + } + } + + // For PPAF-enabled single-writer accounts, allow write hedging using read regions. + // In PPAF, a partition can fail over to a read region for writes, so read regions + // are valid hedge targets even when the account has only one write region. + // + // Design tradeoff: We relax the multi-write-location gate here because PPAF + // fundamentally changes the write routing model — the service accepts writes + // at read regions for failed-over partitions. Without this, write hedging would + // never activate for the most common PPAF scenario (single-writer accounts), + // leaving the customer waiting up to 60-120s for the retry-based failover path. + boolean isPpafWriteHedgingApplicable = operationType.isWriteOperation() + && !this.globalEndpointManager.canUseMultipleWriteLocations() + && this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled() + && Configs.isWriteAvailabilityStrategyEnabledWithPpaf(); + + if (operationType.isWriteOperation() + && !this.globalEndpointManager.canUseMultipleWriteLocations() + && !isPpafWriteHedgingApplicable) { return EMPTY_REGION_LIST; } @@ -7734,7 +7850,12 @@ private List getApplicableRegionsForSpeculation( return EMPTY_REGION_LIST; } - List regionalRoutingContextList = getApplicableEndPoints(operationType, excludedRegions); + // For PPAF write hedging on single-writer accounts, use ALL account-level read regions + // as hedge candidates (not just preferred regions). PPAF failover can target any read region. + List regionalRoutingContextList = + isPpafWriteHedgingApplicable + ? withoutNulls(new ArrayList<>(this.globalEndpointManager.getAvailableReadRoutingContexts())) + : getApplicableEndPoints(operationType, excludedRegions); HashSet normalizedExcludedRegions = new HashSet<>(); if (excludedRegions != null) { @@ -7743,7 +7864,11 @@ private List getApplicableRegionsForSpeculation( List orderedRegionsForSpeculation = new ArrayList<>(); regionalRoutingContextList.forEach(consolidatedLocationEndpoints -> { - String regionName = this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), operationType); + // For PPAF write hedging, resolve region names against read endpoints since + // the hedged write targets are read regions (not write regions). + String regionName = isPpafWriteHedgingApplicable + ? this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), OperationType.Read) + : this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), operationType); if (!normalizedExcludedRegions.contains(regionName.toLowerCase(Locale.ROOT))) { orderedRegionsForSpeculation.add(regionName); } @@ -8067,6 +8192,7 @@ private synchronized void initializePerPartitionFailover(DatabaseAccount databas initializePerPartitionAutomaticFailover(databaseAccountSnapshot); initializePerPartitionCircuitBreaker(); enableAvailabilityStrategyForReads(); + enableAvailabilityStrategyForWrites(); checkNotNull(this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, "Argument 'globalPartitionEndpointManagerForPerPartitionAutomaticFailover' cannot be null."); checkNotNull(this.globalPartitionEndpointManagerForPerPartitionCircuitBreaker, "Argument 'globalPartitionEndpointManagerForPerPartitionCircuitBreaker' cannot be null."); @@ -8115,6 +8241,19 @@ private void enableAvailabilityStrategyForReads() { } } + private void enableAvailabilityStrategyForWrites() { + this.ppafEnforcedE2ELatencyPolicyConfigForWrites = this.evaluatePpafEnforcedE2eLatencyPolicyCfgForWrites( + this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, + this.connectionPolicy + ); + + if (this.ppafEnforcedE2ELatencyPolicyConfigForWrites != null) { + logger.info("ATTN: Per-Partition Automatic Failover (PPAF) enforced E2E Latency Policy for writes is enabled."); + } else { + logger.info("ATTN: Per-Partition Automatic Failover (PPAF) enforced E2E Latency Policy for writes is disabled."); + } + } + public boolean useThinClient() { return useThinClient; } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index 5a92ac4d8ad4..36bb3a4fc13a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -172,6 +172,39 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req return true; } + // For PPAF write hedging: when the availability strategy has set a target read region + // for a hedged write and no existing failover entry exists, create the entry and route there. + // This is the synchronous, deterministic path — the conchashmap is updated in the same + // request pipeline so subsequent requests for this partition route directly to the target. + CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionCtx = + request.requestContext.getCrossRegionAvailabilityContext(); + + if (crossRegionCtx != null && crossRegionCtx.getPpafWriteHedgeTargetRegion() != null) { + RegionalRoutingContext hedgeTarget = crossRegionCtx.getPpafWriteHedgeTargetRegion(); + + // computeIfAbsent is atomic on ConcurrentHashMap — if the retry-based path already + // created an entry for this partition (with a potentially different region), we get + // that entry back. We route to the entry's current region (not blindly to hedgeTarget) + // to avoid routing to a region the retry path may have already marked as failed. + PartitionLevelAutomaticFailoverInfo hedgeFailoverInfo = + this.partitionKeyRangeToFailoverInfo.computeIfAbsent( + partitionKeyRangeWrapper, + k -> new PartitionLevelAutomaticFailoverInfo(hedgeTarget, this.globalEndpointManager)); + + request.requestContext.routeToLocation(hedgeFailoverInfo.getCurrent()); + request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(hedgeFailoverInfo); + + if (logger.isInfoEnabled()) { + logger.info( + "PPAF write hedge: routing write for partition key range {} and collection rid {} to target region {}", + partitionKeyRangeWrapper.getPartitionKeyRange(), + partitionKeyRangeWrapper.getCollectionResourceId(), + hedgeTarget.getGatewayRegionalEndpoint()); + } + + return true; + } + return false; } From 8ebd68152fdb178b318acc4a8b8809e087d29ac6 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Mon, 16 Mar 2026 14:21:10 -0400 Subject: [PATCH 2/6] test(cosmos): add write availability strategy tests for all PPAF-eligible error codes Add 34 new test configurations to write availability strategy hedging tests covering all error codes from the base PPAF E2E test suite: DIRECT mode: - 503/21008 (SERVICE_UNAVAILABLE) for Replace, Upsert, Delete, Patch - 403/3 (FORBIDDEN_WRITEFORBIDDEN) for all 5 write ops - 408/UNKNOWN (REQUEST_TIMEOUT) for all 5 write ops GATEWAY mode: - 403/3 (FORBIDDEN_WRITEFORBIDDEN) for all 5 write ops - 408/UNKNOWN (REQUEST_TIMEOUT) for all 5 write ops - 408/GATEWAY_ENDPOINT_READ_TIMEOUT (network error) for all 5 write ops - 503/GATEWAY_ENDPOINT_UNAVAILABLE (network error) for all 5 write ops Parameterize gateway test method to accept error codes instead of hardcoding 503. Extend setupHttpClientToThrowCosmosException to support combined delay + network error mode for gateway-specific fault types. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- ...PerPartitionAutomaticFailoverE2ETests.java | 430 +++++++++++++++++- 1 file changed, 412 insertions(+), 18 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java index 803f0046da70..c57937890df4 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PerPartitionAutomaticFailoverE2ETests.java @@ -2157,6 +2157,132 @@ public Object[][] ppafWriteAvailabilityStrategyConfigs() { expectedResponseBeforeFailover, expectedResponseAfterFailover, }, + { + "Test write availability strategy hedging for CREATE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.CREATED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for REPLACE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for UPSERT with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for DELETE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for PATCH with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for REPLACE with SERVICE_UNAVAILABLE / SERVER_GENERATED_503 in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for UPSERT with SERVICE_UNAVAILABLE / SERVER_GENERATED_503 in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for DELETE with SERVICE_UNAVAILABLE / SERVER_GENERATED_503 in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for PATCH with SERVICE_UNAVAILABLE / SERVER_GENERATED_503 in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for CREATE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.CREATED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for REPLACE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for UPSERT with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for DELETE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, + { + "Test write availability strategy hedging for PATCH with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedResponseBeforeFailover, + expectedResponseAfterFailover, + }, }; } @@ -2166,8 +2292,8 @@ public Object[][] ppafWriteAvailabilityStrategyConfigs() { * *

Scenario: *

    - *
  1. A fault (GONE/410 with sub-status 21005 or SERVICE_UNAVAILABLE/503) is injected into the - * write region for a specific partition key range.
  2. + *
  3. A fault (GONE/410 with sub-status 21005, SERVICE_UNAVAILABLE/503, or FORBIDDEN/3) is injected + * into the write region for a specific partition key range.
  4. *
  5. The availability strategy should hedge the write to a read region, which returns success.
  6. *
  7. After the hedged write succeeds, the PPAF manager should record the successful read region * as the new write target for that partition.
  8. @@ -2572,37 +2698,277 @@ public Object[][] ppafWriteAvailabilityStrategyGatewayConfigs() { { "GATEWAY: Write availability strategy hedging for CREATE with delayed write region under PPAF.", OperationType.Create, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, HttpConstants.StatusCodes.CREATED, expectedBeforeFailover, expectedAfterFailover, + false, + false, }, { "GATEWAY: Write availability strategy hedging for REPLACE with delayed write region under PPAF.", OperationType.Replace, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, HttpConstants.StatusCodes.OK, expectedBeforeFailover, expectedAfterFailover, + false, + false, }, { "GATEWAY: Write availability strategy hedging for UPSERT with delayed write region under PPAF.", OperationType.Upsert, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, HttpConstants.StatusCodes.OK, expectedBeforeFailover, expectedAfterFailover, + false, + false, }, { "GATEWAY: Write availability strategy hedging for DELETE with delayed write region under PPAF.", OperationType.Delete, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, HttpConstants.StatusCodes.NOT_MODIFIED, expectedBeforeFailover, expectedAfterFailover, + false, + false, }, { "GATEWAY: Write availability strategy hedging for PATCH with delayed write region under PPAF.", OperationType.Patch, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.SERVER_GENERATED_503, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for CREATE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.CREATED, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for REPLACE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for UPSERT with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, HttpConstants.StatusCodes.OK, expectedBeforeFailover, expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for DELETE with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for PATCH with FORBIDDEN / FORBIDDEN_WRITEFORBIDDEN in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.FORBIDDEN, + HttpConstants.SubStatusCodes.FORBIDDEN_WRITEFORBIDDEN, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for CREATE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.CREATED, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for REPLACE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for UPSERT with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for DELETE with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for PATCH with REQUEST_TIMEOUT / UNKNOWN in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.UNKNOWN, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + false, + false, + }, + { + "GATEWAY: Write availability strategy hedging for CREATE with REQUEST_TIMEOUT / GATEWAY_ENDPOINT_READ_TIMEOUT (network error) in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, + HttpConstants.StatusCodes.CREATED, + expectedBeforeFailover, + expectedAfterFailover, + true, + true, + }, + { + "GATEWAY: Write availability strategy hedging for REPLACE with REQUEST_TIMEOUT / GATEWAY_ENDPOINT_READ_TIMEOUT (network error) in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + true, + }, + { + "GATEWAY: Write availability strategy hedging for UPSERT with REQUEST_TIMEOUT / GATEWAY_ENDPOINT_READ_TIMEOUT (network error) in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + true, + }, + { + "GATEWAY: Write availability strategy hedging for DELETE with REQUEST_TIMEOUT / GATEWAY_ENDPOINT_READ_TIMEOUT (network error) in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedBeforeFailover, + expectedAfterFailover, + true, + true, + }, + { + "GATEWAY: Write availability strategy hedging for PATCH with REQUEST_TIMEOUT / GATEWAY_ENDPOINT_READ_TIMEOUT (network error) in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + true, + }, + { + "GATEWAY: Write availability strategy hedging for CREATE with SERVICE_UNAVAILABLE / GATEWAY_ENDPOINT_UNAVAILABLE (network error) in write region under PPAF.", + OperationType.Create, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE, + HttpConstants.StatusCodes.CREATED, + expectedBeforeFailover, + expectedAfterFailover, + true, + false, + }, + { + "GATEWAY: Write availability strategy hedging for REPLACE with SERVICE_UNAVAILABLE / GATEWAY_ENDPOINT_UNAVAILABLE (network error) in write region under PPAF.", + OperationType.Replace, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + false, + }, + { + "GATEWAY: Write availability strategy hedging for UPSERT with SERVICE_UNAVAILABLE / GATEWAY_ENDPOINT_UNAVAILABLE (network error) in write region under PPAF.", + OperationType.Upsert, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + false, + }, + { + "GATEWAY: Write availability strategy hedging for DELETE with SERVICE_UNAVAILABLE / GATEWAY_ENDPOINT_UNAVAILABLE (network error) in write region under PPAF.", + OperationType.Delete, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE, + HttpConstants.StatusCodes.NOT_MODIFIED, + expectedBeforeFailover, + expectedAfterFailover, + true, + false, + }, + { + "GATEWAY: Write availability strategy hedging for PATCH with SERVICE_UNAVAILABLE / GATEWAY_ENDPOINT_UNAVAILABLE (network error) in write region under PPAF.", + OperationType.Patch, + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE, + HttpConstants.StatusCodes.OK, + expectedBeforeFailover, + expectedAfterFailover, + true, + false, }, }; } @@ -2614,8 +2980,10 @@ public Object[][] ppafWriteAvailabilityStrategyGatewayConfigs() { * write failover tests) to control responses from both write and read regions: *
      *
    • Default: all requests return mocked success.
    • - *
    • Override: requests to the write region endpoint are delayed by 10s before returning an error, - * simulating an unresponsive write region.
    • + *
    • Override: requests to the write region endpoint are delayed by 10s before returning a + * parameterized error (SERVICE_UNAVAILABLE/503, FORBIDDEN/3, REQUEST_TIMEOUT/UNKNOWN, + * GATEWAY_ENDPOINT_READ_TIMEOUT via ReadTimeoutException, or GATEWAY_ENDPOINT_UNAVAILABLE + * via SocketTimeoutException), simulating an unresponsive or faulted write region.
    • *
    • The hedged request to the read region hits the default success mock and completes immediately.
    • *
    * @@ -2625,9 +2993,13 @@ public Object[][] ppafWriteAvailabilityStrategyGatewayConfigs() { public void testPpafWriteAvailabilityStrategyHedgingInGatewayMode( String testType, OperationType operationType, + int errorStatusCodeToMock, + int errorSubStatusCodeToMock, int successStatusCode, ExpectedResponseCharacteristics expectedBeforeFailover, - ExpectedResponseCharacteristics expectedAfterFailover) { + ExpectedResponseCharacteristics expectedAfterFailover, + boolean shouldThrowNetworkError, + boolean shouldThrowReadTimeoutExceptionWhenNetworkError) { ConnectionPolicy connectionPolicy = COSMOS_CLIENT_BUILDER_ACCESSOR.getConnectionPolicy(getClientBuilder()); ConnectionMode connectionMode = connectionPolicy.getConnectionMode(); @@ -2686,19 +3058,31 @@ public void testPpafWriteAvailabilityStrategyHedgingInGatewayMode( // Default: all requests return success (including hedged requests to read region) setupHttpClientToReturnSuccessResponse(mockedHttpClient, operationType, databaseAccount, successStatusCode); - // Override: write region requests are delayed by 10s then error — simulates unresponsive write region - CosmosException cosmosException = createCosmosException( - HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, - HttpConstants.SubStatusCodes.SERVER_GENERATED_503); + // Override: write region requests are delayed then error — simulates unresponsive write region + if (!shouldThrowNetworkError) { + CosmosException cosmosException = createCosmosException( + errorStatusCodeToMock, + errorSubStatusCodeToMock); - // shouldForceE2ETimeout=true triggers the Mono.delay(10s) pattern - setupHttpClientToThrowCosmosException( - mockedHttpClient, - new URI(readableRegionNameToEndpoint.get(regionWithIssues)), - cosmosException, - false, // shouldThrowNetworkError - false, // shouldThrowReadTimeoutExceptionWhenNetworkError - true); // shouldForceE2ETimeout — delays response by 10s + // shouldForceE2ETimeout=true triggers the Mono.delay(10s) pattern + setupHttpClientToThrowCosmosException( + mockedHttpClient, + new URI(readableRegionNameToEndpoint.get(regionWithIssues)), + cosmosException, + false, // shouldThrowNetworkError + false, // shouldThrowReadTimeoutExceptionWhenNetworkError + true); // shouldForceE2ETimeout — delays response by 10s + } else { + // For gateway-specific network errors (GATEWAY_ENDPOINT_READ_TIMEOUT, GATEWAY_ENDPOINT_UNAVAILABLE), + // delay 10s then throw the raw network exception (ReadTimeoutException / SocketTimeoutException) + setupHttpClientToThrowCosmosException( + mockedHttpClient, + new URI(readableRegionNameToEndpoint.get(regionWithIssues)), + null, // cosmosException not used for network errors + true, // shouldThrowNetworkError + shouldThrowReadTimeoutExceptionWhenNetworkError, + true); // shouldForceE2ETimeout — delays response by 10s before throwing network error + } TestObject testItem = TestObject.create(); @@ -2802,13 +3186,23 @@ private void setupHttpClientToThrowCosmosException( boolean shouldForceE2ETimeout) { if (shouldForceE2ETimeout) { + + Exception delayedError; + if (shouldThrowNetworkError) { + delayedError = shouldThrowReadTimeoutExceptionWhenNetworkError + ? new ReadTimeoutException() + : new SocketTimeoutException(); + } else { + delayedError = cosmosException; + } + Mockito.when( httpClientMock.send( Mockito.argThat(argument -> { URI uri = argument.uri(); return uri.toString().contains(locationEndpointToRoute.toString()); }), Mockito.any(Duration.class))) - .thenReturn(Mono.delay(Duration.ofSeconds(10)).flatMap(aLong -> Mono.error(cosmosException))); + .thenReturn(Mono.delay(Duration.ofSeconds(10)).flatMap(aLong -> Mono.error(delayedError))); return; } From 629c7b295ea2dd559fefdb23ca37df45d40bee03 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Mon, 16 Mar 2026 14:43:39 -0400 Subject: [PATCH 3/6] docs(cosmos): add PPAF write availability strategy to CHANGELOG Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index cd85fb9aaafe..29bddee324c9 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -4,7 +4,8 @@ #### Features Added * Added support for N-Region synchronous commit feature - See [PR 47757](https://github.com/Azure/azure-sdk-for-java/pull/47757) -* Added support for Query Advisor feature - See [48160](https://github.com/Azure/azure-sdk-for-java/pull/48160) +* Added support for Query Advisor feature - See [48160](https://github.com/Azure/azure-sdk-for-java/pull/48160) +* Added write availability strategy (hedging) for Per-Partition Automatic Failover (PPAF) single-writer accounts. When a write to the current write region is slow or fails (410/21005, 503/21008, 403/3, 408), the SDK hedges the write to a read region via the existing availability strategy. On success, the PPAF manager records the new region so subsequent writes route directly there. Controlled by `COSMOS.IS_WRITE_AVAILABILITY_STRATEGY_ENABLED_WITH_PPAF` system property (default: enabled). #### Breaking Changes From 489b36017aad7ffb1d30e49b5aae76a49a41ffdf Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Wed, 18 Mar 2026 20:00:22 -0400 Subject: [PATCH 4/6] Fix ChangeFeed processor race: block on stop before next start validateChangeFeedProcessing called stop() with subscribe() (fire-and-forget), then returned. The caller immediately starts a full fidelity CFP on the same lease container. With the CI optimization (PR #48259) making validateChangeFeedProcessing return faster, the async stop hasn't released leases yet, causing the next start() to hang and timeout. Fix: change stop() from subscribe() to block() so leases are fully released before returning. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../epkversion/IncrementalChangeFeedProcessorTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index f2bac940b448..fc0e65a84d44 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -2228,7 +2228,11 @@ void validateChangeFeedProcessing( assertThat(item.getHostName()).isEqualTo(hostName).as("Change Feed Processor ownership"); } - changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe(); + // Block on stop to ensure the processor fully releases leases before the caller + // starts another processor on the same lease container (e.g. full fidelity CFP). + changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()) + .timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)) + .block(); for (InternalObjectNode item : createdDocuments) { assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); From eaf3d608ddce069c4276a57a45f5b173b204e192 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Thu, 19 Mar 2026 11:16:40 -0400 Subject: [PATCH 5/6] Address PR review: rename isPpafWriteHedging, scope map allocation - Rename isPpafWriteHedging to applyAvailabilityStrategyForWritesForPpaf for clarity (both in wrapPointOperationWithAvailabilityStrategy and getApplicableRegionsForSpeculation) - Scope HashMap allocation: only create when PPAF write availability strategy is applicable, use Collections.emptyMap() otherwise to avoid per-request allocation for reads - Update comments to use 'availability strategy' terminology Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/RxDocumentClientImpl.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 8826d50b83b2..1119e54e5804 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -7344,16 +7344,17 @@ private Mono> wrapPointOperationWithAvailabilityStrat idempotentWriteRetriesEnabled, nonNullRequestOptions); - // For PPAF write hedging on single-writer accounts, build a map of region name → RegionalRoutingContext + // For PPAF write availability strategy on single-writer accounts, build a map of region name → RegionalRoutingContext // so hedged requests can be force-routed to specific read regions via routeToLocation. // This bypasses the excluded-regions mechanism which cannot route writes to read regions. - boolean isPpafWriteHedging = operationType.isWriteOperation() + boolean applyAvailabilityStrategyForWritesForPpaf = operationType.isWriteOperation() && !this.globalEndpointManager.canUseMultipleWriteLocations() && this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled() && Configs.isWriteAvailabilityStrategyEnabledWithPpaf(); - Map regionToRoutingContext = new HashMap<>(); - if (isPpafWriteHedging) { + Map regionToRoutingContext; + if (applyAvailabilityStrategyForWritesForPpaf) { + regionToRoutingContext = new HashMap<>(); // Use ALL account-level read regions (not just preferred regions) as hedge candidates. // PPAF write failover can target any read region, not just the ones in the preferred list. List readRoutingContexts = @@ -7365,6 +7366,8 @@ private Mono> wrapPointOperationWithAvailabilityStrat regionToRoutingContext.put(regionName.toLowerCase(Locale.ROOT), rrc); } } + } else { + regionToRoutingContext = Collections.emptyMap(); } AtomicBoolean isOperationSuccessful = new AtomicBoolean(false); @@ -7480,9 +7483,9 @@ private Mono> wrapPointOperationWithAvailabilityStrat perPartitionCircuitBreakerInfoHolder, perPartitionAutomaticFailoverInfoHolder); - // For PPAF write hedging, set the target read region so ClientRetryPolicy + // For PPAF write availability strategy, set the target read region so ClientRetryPolicy // routes the hedged write there via routeToLocation instead of excluded-regions. - if (isPpafWriteHedging) { + if (applyAvailabilityStrategyForWritesForPpaf) { RegionalRoutingContext targetRegion = regionToRoutingContext.get(region.toLowerCase(Locale.ROOT)); if (targetRegion != null) { crossRegionAvailabilityContextForHedgedRequest.setPpafWriteHedgeTargetRegion(targetRegion); @@ -7826,7 +7829,7 @@ private List getApplicableRegionsForSpeculation( } } - // For PPAF-enabled single-writer accounts, allow write hedging using read regions. + // For PPAF-enabled single-writer accounts, allow write availability strategy using read regions. // In PPAF, a partition can fail over to a read region for writes, so read regions // are valid hedge targets even when the account has only one write region. // @@ -7835,14 +7838,14 @@ private List getApplicableRegionsForSpeculation( // at read regions for failed-over partitions. Without this, write hedging would // never activate for the most common PPAF scenario (single-writer accounts), // leaving the customer waiting up to 60-120s for the retry-based failover path. - boolean isPpafWriteHedgingApplicable = operationType.isWriteOperation() + boolean applyAvailabilityStrategyForWritesForPpaf = operationType.isWriteOperation() && !this.globalEndpointManager.canUseMultipleWriteLocations() && this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled() && Configs.isWriteAvailabilityStrategyEnabledWithPpaf(); if (operationType.isWriteOperation() && !this.globalEndpointManager.canUseMultipleWriteLocations() - && !isPpafWriteHedgingApplicable) { + && !applyAvailabilityStrategyForWritesForPpaf) { return EMPTY_REGION_LIST; } @@ -7850,10 +7853,10 @@ private List getApplicableRegionsForSpeculation( return EMPTY_REGION_LIST; } - // For PPAF write hedging on single-writer accounts, use ALL account-level read regions + // For PPAF write availability strategy on single-writer accounts, use ALL account-level read regions // as hedge candidates (not just preferred regions). PPAF failover can target any read region. List regionalRoutingContextList = - isPpafWriteHedgingApplicable + applyAvailabilityStrategyForWritesForPpaf ? withoutNulls(new ArrayList<>(this.globalEndpointManager.getAvailableReadRoutingContexts())) : getApplicableEndPoints(operationType, excludedRegions); @@ -7864,9 +7867,9 @@ private List getApplicableRegionsForSpeculation( List orderedRegionsForSpeculation = new ArrayList<>(); regionalRoutingContextList.forEach(consolidatedLocationEndpoints -> { - // For PPAF write hedging, resolve region names against read endpoints since + // For PPAF write availability strategy, resolve region names against read endpoints since // the hedged write targets are read regions (not write regions). - String regionName = isPpafWriteHedgingApplicable + String regionName = applyAvailabilityStrategyForWritesForPpaf ? this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), OperationType.Read) : this.globalEndpointManager.getRegionName(consolidatedLocationEndpoints.getGatewayRegionalEndpoint(), operationType); if (!normalizedExcludedRegions.contains(regionName.toLowerCase(Locale.ROOT))) { From ff6eb7155cf94e0a3fa9a02f3a31d96bce262061 Mon Sep 17 00:00:00 2001 From: Abhijeet Mohanty Date: Thu, 19 Mar 2026 11:45:48 -0400 Subject: [PATCH 6/6] Address PR review: rename fields, fix unsafe computeIfAbsent, scope map - Rename isPpafWriteHedging to applyAvailabilityStrategyForWritesForPpaf - Rename ppafWriteHedgeTargetRegion to writeRegionRoutingContextForPpafAvailabilityStrategy - Move field to top of CrossRegionAvailabilityContext class - Replace conchashmap with ConcurrentHashMap in comments - Scope HashMap allocation: Collections.emptyMap() for reads - Fix unsafe computeIfAbsent: do not create failover override until hedge succeeds - Verify read/write e2e policies produce identical values (confirmed) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../implementation/ClientRetryPolicy.java | 4 +-- ...ityContextForRxDocumentServiceRequest.java | 26 +++++++-------- .../implementation/RxDocumentClientImpl.java | 2 +- ...nagerForPerPartitionAutomaticFailover.java | 33 ++++++++++--------- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index 76ae0cfc2a8f..811a58e70152 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -551,8 +551,8 @@ public void onBeforeSendRequest(RxDocumentServiceRequest request) { } // In case PPAF is enabled and a location override exists for the partition key range assigned to the request. - // This also handles PPAF write hedging — when ppafWriteHedgeTargetRegion is set, it creates - // the conchashmap entry and routes the request to the target read region. + // This also handles PPAF write hedging — when writeRegionRoutingContextForPpafAvailabilityStrategy is set, it creates + // the ConcurrentHashMap entry and routes the request to the target read region. this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.tryAddPartitionLevelLocationOverride(request); this.throttlingRetry.onBeforeSendRequest(request); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java index 3226fd13db9b..3aec0d430976 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CrossRegionAvailabilityContextForRxDocumentServiceRequest.java @@ -30,6 +30,15 @@ public class CrossRegionAvailabilityContextForRxDocumentServiceRequest { private final PerPartitionAutomaticFailoverInfoHolder perPartitionAutomaticFailoverInfoHolder; + /** + * For PPAF write hedging on single-writer accounts, this field holds the target + * read region that the hedged write should be routed to. When set, {@code ClientRetryPolicy} + * uses {@code routeToLocation(RegionalRoutingContext)} to force-route the request + * to this region, bypassing the excluded-regions mechanism which cannot route writes + * to read regions on single-writer accounts. + */ + private volatile RegionalRoutingContext writeRegionRoutingContextForPpafAvailabilityStrategy; + public CrossRegionAvailabilityContextForRxDocumentServiceRequest( FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreaker, PointOperationContextForCircuitBreaker pointOperationContextForCircuitBreaker, @@ -98,20 +107,11 @@ public PerPartitionAutomaticFailoverInfoHolder getPerPartitionAutomaticFailoverI return this.perPartitionAutomaticFailoverInfoHolder; } - /** - * For PPAF write hedging on single-writer accounts, this field holds the target - * read region that the hedged write should be routed to. When set, {@code ClientRetryPolicy} - * uses {@code routeToLocation(RegionalRoutingContext)} to force-route the request - * to this region, bypassing the excluded-regions mechanism which cannot route writes - * to read regions on single-writer accounts. - */ - private volatile RegionalRoutingContext ppafWriteHedgeTargetRegion; - - public RegionalRoutingContext getPpafWriteHedgeTargetRegion() { - return this.ppafWriteHedgeTargetRegion; + public RegionalRoutingContext getWriteRegionRoutingContextForPpafAvailabilityStrategy() { + return this.writeRegionRoutingContextForPpafAvailabilityStrategy; } - public void setPpafWriteHedgeTargetRegion(RegionalRoutingContext ppafWriteHedgeTargetRegion) { - this.ppafWriteHedgeTargetRegion = ppafWriteHedgeTargetRegion; + public void setWriteRegionRoutingContextForPpafAvailabilityStrategy(RegionalRoutingContext writeRegionRoutingContextForPpafAvailabilityStrategy) { + this.writeRegionRoutingContextForPpafAvailabilityStrategy = writeRegionRoutingContextForPpafAvailabilityStrategy; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index 1119e54e5804..13c8f2828a22 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -7488,7 +7488,7 @@ private Mono> wrapPointOperationWithAvailabilityStrat if (applyAvailabilityStrategyForWritesForPpaf) { RegionalRoutingContext targetRegion = regionToRoutingContext.get(region.toLowerCase(Locale.ROOT)); if (targetRegion != null) { - crossRegionAvailabilityContextForHedgedRequest.setPpafWriteHedgeTargetRegion(targetRegion); + crossRegionAvailabilityContextForHedgedRequest.setWriteRegionRoutingContextForPpafAvailabilityStrategy(targetRegion); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java index 36bb3a4fc13a..252fbd776fde 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/perPartitionAutomaticFailover/GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover.java @@ -173,30 +173,31 @@ public boolean tryAddPartitionLevelLocationOverride(RxDocumentServiceRequest req } // For PPAF write hedging: when the availability strategy has set a target read region - // for a hedged write and no existing failover entry exists, create the entry and route there. - // This is the synchronous, deterministic path — the conchashmap is updated in the same - // request pipeline so subsequent requests for this partition route directly to the target. + // for a hedged write, route the request to that region WITHOUT creating a failover entry. + // A failover override should only be persisted when the hedged request succeeds — creating + // one eagerly here would route future requests to a potentially bad region if this request fails. CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionCtx = request.requestContext.getCrossRegionAvailabilityContext(); - if (crossRegionCtx != null && crossRegionCtx.getPpafWriteHedgeTargetRegion() != null) { - RegionalRoutingContext hedgeTarget = crossRegionCtx.getPpafWriteHedgeTargetRegion(); + if (crossRegionCtx != null && crossRegionCtx.getWriteRegionRoutingContextForPpafAvailabilityStrategy() != null) { + RegionalRoutingContext hedgeTarget = crossRegionCtx.getWriteRegionRoutingContextForPpafAvailabilityStrategy(); - // computeIfAbsent is atomic on ConcurrentHashMap — if the retry-based path already - // created an entry for this partition (with a potentially different region), we get - // that entry back. We route to the entry's current region (not blindly to hedgeTarget) - // to avoid routing to a region the retry path may have already marked as failed. - PartitionLevelAutomaticFailoverInfo hedgeFailoverInfo = - this.partitionKeyRangeToFailoverInfo.computeIfAbsent( - partitionKeyRangeWrapper, - k -> new PartitionLevelAutomaticFailoverInfo(hedgeTarget, this.globalEndpointManager)); + // Check if an existing failover entry already exists for this partition + PartitionLevelAutomaticFailoverInfo existingFailoverInfo = this.partitionKeyRangeToFailoverInfo.get(partitionKeyRangeWrapper); - request.requestContext.routeToLocation(hedgeFailoverInfo.getCurrent()); - request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(hedgeFailoverInfo); + if (existingFailoverInfo != null) { + // An existing entry exists (from retry-based failover path) — use its current region + request.requestContext.routeToLocation(existingFailoverInfo.getCurrent()); + request.requestContext.setPerPartitionAutomaticFailoverInfoHolder(existingFailoverInfo); + } else { + // No existing entry — route directly to the hedge target WITHOUT creating an override. + // The override will only be created if this hedged request succeeds. + request.requestContext.routeToLocation(hedgeTarget); + } if (logger.isInfoEnabled()) { logger.info( - "PPAF write hedge: routing write for partition key range {} and collection rid {} to target region {}", + "PPAF write availability strategy: routing write for partition key range {} and collection rid {} to target region {}", partitionKeyRangeWrapper.getPartitionKeyRange(), partitionKeyRangeWrapper.getCollectionResourceId(), hedgeTarget.getGatewayRegionalEndpoint());