Skip to content

Rebalance shards when ingester status changes#6185

Open
ncoiffier-celonis wants to merge 11 commits intoquickwit-oss:mainfrom
ncoiffier-celonis:ingester-status-rebased
Open

Rebalance shards when ingester status changes#6185
ncoiffier-celonis wants to merge 11 commits intoquickwit-oss:mainfrom
ncoiffier-celonis:ingester-status-rebased

Conversation

@ncoiffier-celonis
Copy link
Collaborator

Description

Attempt to fix #6158

Following @guilload's suggestion here, this PR:

  • gossip the ingester status over chit chat
  • update the ingester pool when ingester status changes
  • update the indexer pool too when ingester status changes (to fix no open shard found on ingester error)
  • have the control plane rebalance the shards when the ingester status changes

With this approach, even if we have some 10s propagation delay before decomissioning, it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.

Any feedback is welcome!!

How was this PR tested?

In addition of the unit and integration tests, I've run it against a local cluster with 2 indexer and observed that the number of errors reported in #6158 decreases from a few 100 to no errors.

Other approches

This PR is fairly identical to the branch guilload/ingester-status, rebased on main and with some additional bugfixes:

  • fix bug in timeout_after being always 0, causing to not wait
  • update ingester pool when IngesterStatus change (not only indexer pool)
  • more unit and integration tests

@guilload
Copy link
Member

guilload commented Mar 4, 2026

it is still possible to fail to ingest some documents if the chitchat takes longer than expected to gossip the ingester status to the control-plane.

technically the ingest router should just retry when that happens and there should be a path for the router to open a new shard if the ingester being decommissioned was the only one to have shard(s) for this index. Is it not what you observed?

@guilload
Copy link
Member

guilload commented Mar 4, 2026

@ncoiffier-celonis I giving you write access to the repo so next time you can push directly on this repo rather than our fork. It makes it easier for me to checkout your changes locally. Though I learnt how to use gh pr checkout <PR_NUMBER> in the meantime.

Copy link
Member

@guilload guilload left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're close but we need to fix a few issues.

}

#[cfg(any(test, feature = "testsuite"))]
pub async fn for_test_with_ingester_status(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems a bit overkill to me

Copy link
Collaborator Author

@ncoiffier-celonis ncoiffier-celonis Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed with 0c1c82b: I unified ClusterNode::for_test_with_ingester_status into ClusterNode::for_test

pub struct IngestController {
ingester_pool: IngesterPool,
pub(crate) ingester_pool: IngesterPool,
pub(crate) stats: IngestControllerStats,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let Some(mailbox) = weak_mailbox.upgrade() else {
return;
};
let mut trigger_rebalance = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ncoiffier-celonis please review this tricky logic thoroughly. I'm the initial author of this change and now I'm also reviewing it so I'm more likely to miss something. I could use a second pair of eyes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this logic def needs a comment. Here, we're considering both indexers and ingesters. Indexers run indexing pipelines when they're ready, they are ready to index, so we want to rebuild an index plan. Same thing when they leave.

In addition, we're considering ingesters (technically all indexers are ingesters and vice-versa because we didn't want to expose users to a new service (service as metastore, janitor, control-plane, etc. not micro service as router, ingester, debug info, etc.)

Ingesters have two level of readiness. First one same as indexer, "I'm up and running, I can connect to the metastore". Second one, "I have loaded my WAL".

So we want to rebalance when the ingester is ready ready, which can happens from the perspective of the stream of events as:

  1. Add(ready, ready)

OR

  1. Add(ready, not ready)
  2. Update(ready, ready)

The logic below tries to implement that.

}

#[tokio::test]
async fn test_wait_for_ingester_decommission_elapsed_timeout_not_zero() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// Ingest docs with auto-commit. With a 5s commit timeout, these documents
// sit uncommitted in the ingesters' WAL - exactly the in-flight state we
// want to exercise during draining.
ingest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know the shard for this index is always go to be created on the indexer that we're about to shutdown?

/// Tests that the graceful shutdown sequence works correctly in a multi-indexer
/// cluster: shutting down one indexer does NOT cause 500 errors or data loss,
/// and the cluster eventually rebalances. see #6158
#[tokio::test]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very nice! Let's make sure this is not flaky, though. Run it 1,000 times! This is how I do it (fish):

while true
  c t --manifest-path quickwit/Cargo.toml -p quickwit-integration-tests --nocapture  -- test_graceful_shutdown_no_data_loss
end

Ok((ingest_router, ingest_router_service, ingester_opt))
}

fn setup_ingester_pool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we need to be extremely careful about this convoluted logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've thought more about this, I think we have an issue with this logic. This creates a pool of write-only ingesters, which is great for the logic in quickwit-ingest, but in quickwit-indexing, the source also holds an ingester pool and we still want to be able to read and truncate from ingesters when they are in the retiring and decommissioning status. I don't think we want to actually create and mange those distinct pools so we need to maybe restrict this pool to not initializing ingesters and push the additional filtering logic whereever needed (router, control plane).

Comment on lines +1380 to +1390
if let Some(ingester) = &ingester_opt {
if let Ok(status) = try_get_ingester_status(ingester).await {
status != IngesterStatus::Failed
} else {
// If we couldn't get the ingester status, it's not looking good, so we set
// the node to not ready.
false
}
} else {
true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(ingester) = &ingester_opt {
if let Ok(status) = try_get_ingester_status(ingester).await {
status != IngesterStatus::Failed
} else {
// If we couldn't get the ingester status, it's not looking good, so we set
// the node to not ready.
false
}
} else {
true
}

Feels like this logic does not need to be duplicated. Brainfart on my end? WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wasn't super sure about that part either. I assumed that you change the metastore "readiness" definition to include a downstream check on the ingester on purpose. I've removed it with 7716f56.

);
Some(change)
}
ClusterChange::Add(node) | ClusterChange::Update { updated: node, .. }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to update the pool each time a chitchat key value changes. This is not buying us anything and will generate some noisy logs.

@guilload guilload requested a review from nadav-govari March 4, 2026 21:46
@guilload
Copy link
Member

guilload commented Mar 4, 2026

@nadav-govari, I need your eyes on this because:

  • this is a tricky PR so the more reviewers the ...
  • you will most likely have to troubleshoot and fix the bugs it will introduce
  • this will conflict with your current work
  • this will give you a cleaner way to propagate an ingester's az

@ncoiffier-celonis ncoiffier-celonis force-pushed the ingester-status-rebased branch from 81e493d to 6bebf0d Compare March 5, 2026 10:08
@ncoiffier-celonis
Copy link
Collaborator Author

(I took the liberty to force-push after signing all the individual commits, no code change)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Indexer graceful shutdown causes ingestion gap and 500 errors "no shards available"

2 participants