Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] Compactor: Add concurrency for partition cleanup and mark block for deletion #7246
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] Make query ingester within a per tenant configuration. #7160
* [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278
* [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
* [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.LimitsConfig.Validate(c.NameValidationScheme, c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.LimitsConfig.ValidateQueryLimits("default", c.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil {
return errors.Wrap(err, "invalid query routing config")
}
if err := c.ResourceMonitor.Validate(); err != nil {
return errors.Wrap(err, "invalid resource-monitor config")
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func (t *Cortex) initOverridesExporter() (services.Service, error) {

func (t *Cortex) initDistributorService() (serv services.Service, err error) {
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
t.Cfg.Distributor.NameValidationScheme = t.Cfg.NameValidationScheme
t.Cfg.IngesterClient.GRPCClientConfig.SignWriteRequestsEnabled = t.Cfg.Distributor.SignWriteRequestsEnabled

Expand Down Expand Up @@ -495,7 +494,6 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.OverridesConfig, prometheus.DefaultRegisterer, util_log.Logger, t.ResourceMonitor)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,13 @@ func (l runtimeConfigLoader) load(r io.Reader) (any, error) {
// only check if target is `all`, `distributor`, "querier", and "ruler"
// refer to https://github.com/cortexproject/cortex/issues/6741#issuecomment-3067244929
if overrides != nil {
for _, ul := range overrides.TenantLimits {
for userID, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.NameValidationScheme, l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return nil, err
}
if err := ul.ValidateQueryLimits(userID, l.cfg.BlocksStorage.TSDB.CloseIdleTSDBTimeout); err != nil {
return nil, err
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ type Config struct {
// this (and should never use it) but this feature is used by other projects built on top of it
SkipLabelNameValidation bool `yaml:"-"`

// This config is dynamically injected because defined in the querier config.
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`

// ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set
// with metadata APIs (labels names and values for now). When zone awareness is enabled, only results
// from quorum number of zones will be included to reduce data merged and improve performance.
Expand Down
146 changes: 145 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3295,7 +3295,7 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []

if cfg.shuffleShardEnabled {
distributorCfg.ShardingStrategy = util.ShardingStrategyShuffle
distributorCfg.ShuffleShardingLookbackPeriod = time.Hour
cfg.limits.ShuffleShardingIngestersLookbackPeriod = time.Hour

cfg.limits.IngestionTenantShardSize = cfg.shuffleShardSize
}
Expand Down Expand Up @@ -4697,3 +4697,147 @@ func TestDistributor_BatchTimeoutMetric(t *testing.T) {
cortex_distributor_ingester_push_timeouts_total 5
`), "cortex_distributor_ingester_push_timeouts_total"))
}
func TestDistributor_ShuffleShardingIngestersLookbackPeriod(t *testing.T) {
t.Parallel()

tests := map[string]struct {
lookbackPeriod time.Duration
shardSize int
expectedBehavior string
}{
"lookback disabled (0) should not use shuffle sharding with lookback": {
lookbackPeriod: 0,
shardSize: 3,
expectedBehavior: "no_lookback",
},
"lookback 1h should include ingesters from past hour": {
lookbackPeriod: 1 * time.Hour,
shardSize: 3,
expectedBehavior: "with_lookback",
},
"lookback 2h should include ingesters from past 2 hours": {
lookbackPeriod: 2 * time.Hour,
shardSize: 3,
expectedBehavior: "with_lookback",
},
"shard size 0 should not use shuffle sharding": {
lookbackPeriod: 1 * time.Hour,
shardSize: 0,
expectedBehavior: "no_shuffle_sharding",
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()

// Setup distributor with shuffle sharding enabled
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.IngestionTenantShardSize = testData.shardSize
limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.lookbackPeriod)

numIngesters := 10
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: numIngesters,
happyIngesters: numIngesters,
numDistributors: 1,
shardByAllLabels: true,
shuffleShardSize: testData.shardSize,
shuffleShardEnabled: true,
limits: limits,
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Get ingesters for query
replicationSet, err := ds[0].GetIngestersForQuery(ctx)
require.NoError(t, err)

switch testData.expectedBehavior {
case "no_lookback":
// When lookback is disabled, should still use shuffle sharding but without lookback
// This means we get the current shard size
if testData.shardSize > 0 {
assert.LessOrEqual(t, len(replicationSet.Instances), testData.shardSize,
"should not exceed shard size when lookback is disabled")
}

case "with_lookback":
// When lookback is enabled, should use shuffle sharding with lookback
// This means we might get more ingesters than the shard size
assert.GreaterOrEqual(t, len(replicationSet.Instances), testData.shardSize,
"should include at least shard size ingesters with lookback")

case "no_shuffle_sharding":
// When shard size is 0, shuffle sharding is disabled
// Should query all ingesters
assert.Equal(t, numIngesters, len(replicationSet.Instances),
"should query all ingesters when shuffle sharding is disabled")
}
})
}
}

func TestDistributor_ShuffleShardingIngestersLookbackPeriod_Validation(t *testing.T) {
t.Parallel()

tests := map[string]struct {
queryStoreAfter time.Duration
shuffleShardingIngestersLookbackPeriod time.Duration
shouldBeValid bool
description string
}{
"valid: lookback >= queryStoreAfter": {
queryStoreAfter: 1 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 2 * time.Hour,
shouldBeValid: true,
description: "lookback period should be >= queryStoreAfter",
},
"valid: lookback == queryStoreAfter": {
queryStoreAfter: 1 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: true,
description: "lookback period can equal queryStoreAfter",
},
"invalid: lookback < queryStoreAfter": {
queryStoreAfter: 2 * time.Hour,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: false,
description: "lookback period must be >= queryStoreAfter",
},
"valid: both disabled": {
queryStoreAfter: 0,
shuffleShardingIngestersLookbackPeriod: 0,
shouldBeValid: true,
description: "both can be disabled",
},
"valid: queryStoreAfter disabled": {
queryStoreAfter: 0,
shuffleShardingIngestersLookbackPeriod: 1 * time.Hour,
shouldBeValid: true,
description: "queryStoreAfter can be disabled while lookback is enabled",
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
t.Parallel()

limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.QueryStoreAfter = model.Duration(testData.queryStoreAfter)
limits.ShuffleShardingIngestersLookbackPeriod = model.Duration(testData.shuffleShardingIngestersLookbackPeriod)

// ValidateQueryLimits requires userID and closeIdleTSDBTimeout
err := limits.ValidateQueryLimits("test-user", 13*time.Hour)

if testData.shouldBeValid {
assert.NoError(t, err, testData.description)
} else {
assert.Error(t, err, testData.description)
assert.Contains(t, err.Error(), "shuffle_sharding_ingesters_lookback_period", testData.description)
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*lab
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
lookbackPeriod := d.limits.ShuffleShardingIngestersLookbackPeriod

if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
Expand Down
9 changes: 3 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

// Injected at runtime and read from querier config.
QueryIngestersWithin time.Duration `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

Expand Down Expand Up @@ -1919,7 +1916,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -2035,7 +2032,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
}
defer db.releaseReadLock()

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -2167,7 +2164,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
return cleanup, err
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryIngestersWithin)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.limits.QueryIngestersWithin(userID))
if err != nil {
return cleanup, err
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3712,13 +3712,17 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
for testName, testData := range tests {

t.Run(testName, func(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.QueryIngestersWithin = model.Duration(testData.queryIngestersWithin)
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{"test": &limits})
i.limits = validation.NewOverrides(limits, tenantLimits)

req := &client.MetricsForLabelMatchersRequest{
StartTimestampMs: testData.from,
EndTimestampMs: testData.to,
MatchersSet: testData.matchers,
Limit: testData.limit,
}
i.cfg.QueryIngestersWithin = testData.queryIngestersWithin
res, err := i.MetricsForLabelMatchers(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Metric)
Expand Down Expand Up @@ -6327,12 +6331,15 @@ func TestExpendedPostingsCacheMatchers(t *testing.T) {
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true
cfg.QueryIngestersWithin = 24 * time.Hour

limits := defaultLimitsTestConfig()
limits.QueryIngestersWithin = model.Duration(24 * time.Hour)
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits})

ctx := user.InjectOrgID(context.Background(), userID)

r := prometheus.NewRegistry()
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, "", r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
Expand Down
25 changes: 10 additions & 15 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type BlocksStoreLimits interface {

MaxChunksPerQueryFromStore(userID string) int
StoreGatewayTenantShardSize(userID string) float64
QueryStoreAfter(userID string) time.Duration
}

type blocksStoreQueryableMetrics struct {
Expand Down Expand Up @@ -133,13 +134,12 @@ func newBlocksStoreQueryableMetrics(reg prometheus.Registerer) *blocksStoreQuery
type BlocksStoreQueryable struct {
services.Service

stores BlocksStoreSet
finder BlocksFinder
consistency *BlocksConsistencyChecker
logger log.Logger
queryStoreAfter time.Duration
metrics *blocksStoreQueryableMetrics
limits BlocksStoreLimits
stores BlocksStoreSet
finder BlocksFinder
consistency *BlocksConsistencyChecker
logger log.Logger
metrics *blocksStoreQueryableMetrics
limits BlocksStoreLimits

storeGatewayQueryStatsEnabled bool
storeGatewayConsistencyCheckMaxAttempts int
Expand Down Expand Up @@ -168,7 +168,6 @@ func NewBlocksStoreQueryable(
stores: stores,
finder: finder,
consistency: consistency,
queryStoreAfter: config.QueryStoreAfter,
logger: logger,
subservices: manager,
subservicesWatcher: services.NewFailureWatcher(),
Expand Down Expand Up @@ -305,7 +304,6 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error
limits: q.limits,
consistency: q.consistency,
logger: q.logger,
queryStoreAfter: q.queryStoreAfter,
storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled,
storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts,
storeGatewaySeriesBatchSize: q.storeGatewaySeriesBatchSize,
Expand All @@ -321,10 +319,6 @@ type blocksStoreQuerier struct {
limits BlocksStoreLimits
logger log.Logger

// If set, the querier manipulates the max time to not be greater than
// "now - queryStoreAfter" so that most recent blocks are not queried.
queryStoreAfter time.Duration

// If enabled, query stats of store gateway requests will be logged
// using `info` level.
storeGatewayQueryStatsEnabled bool
Expand Down Expand Up @@ -492,14 +486,15 @@ func (q *blocksStoreQuerier) selectSorted(ctx context.Context, sp *storage.Selec

func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, matchers []*labels.Matcher,
userID string, queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error {
queryStoreAfter := q.limits.QueryStoreAfter(userID)
// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
// optimization is particularly important for the blocks storage because can be used to skip
// querying most recent not-compacted-yet blocks from the storage.
if q.queryStoreAfter > 0 {
if queryStoreAfter > 0 {
now := time.Now()
origMaxT := maxT
maxT = min(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))
maxT = min(maxT, util.TimeToMillis(now.Add(-queryStoreAfter)))

if origMaxT != maxT {
level.Debug(logger).Log("msg", "the max time of the query to blocks storage has been manipulated", "original", origMaxT, "updated", maxT)
Expand Down
Loading
Loading