From 981393bdc3128c468891affc50ae22ced7b05989 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 28 Mar 2025 10:30:32 -0700 Subject: [PATCH 1/7] refactor dynamic query splitting to reduce complexity Signed-off-by: Ahmed Hassan --- .../queryrange/split_by_interval.go | 93 ++++++------------- .../queryrange/split_by_interval_test.go | 90 +++++++----------- 2 files changed, 58 insertions(+), 125 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 138338daf82..ec7a9d247cf 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -219,7 +219,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplits := time.Duration(maxSplitsInt) queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond)) // Calculate the multiple (n) of base interval needed to shard query into <= maxSplits - n := ceilDiv(queryRange, baseInterval*maxSplits) + n := ceilDiv(int64(queryRange), int64(baseInterval*maxSplits)) if n <= 0 { n = 1 } @@ -227,9 +227,9 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, // Loop to handle cases where first split is truncated and shorter than remaining splits. // Exits loop if interval (n) is sufficient after removing first split // If no suitable interval was found terminates at a maximum of interval = 2 * query range - for n <= 2*ceilDiv(queryRange, baseInterval) { - // Find new start time for query after removing first split - nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep() + for n <= 2*ceilDiv(int64(queryRange), int64(baseInterval)) { + // Find the new start time for query after removing first split + nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), time.Duration(n)*baseInterval) + r.GetStep() if maxSplits == 1 { // If maxSplits == 1, the removed first split should cover the full query range. if nextSplitStart >= r.GetEnd() { @@ -238,7 +238,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, } else { queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond)) // Recalculate n for the remaining query range with maxSplits-1. - n_temp := ceilDiv(queryRangeWithoutFirstSplit, baseInterval*(maxSplits-1)) + n_temp := ceilDiv(int64(queryRangeWithoutFirstSplit), int64(baseInterval*(maxSplits-1))) // If a larger interval is needed after removing the first split, the initial n was insufficient. if n >= n_temp { break @@ -247,7 +247,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, // Increment n to check if larger interval fits the maxSplits constraint. n++ } - return n * baseInterval + return time.Duration(n) * baseInterval } // Return max allowed number of splits by MaxShardsPerQuery config after accounting for vertical sharding @@ -264,16 +264,17 @@ func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int // Return max allowed number of splits by MaxFetchedDataDurationPerQuery config after accounting for vertical sharding func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int { - fixedDurationFetched, perSplitDurationFetched := getDurationFetchedByQuerySplitting(expr, queryStart, queryEnd, queryStep, baseInterval, lookbackDelta) - if perSplitDurationFetched == 0 { - return int(maxFetchedDataDurationPerQuery / baseInterval) // Total duration fetched does not increase with number of splits, return default max splits + durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) + + if durationFetchedBySelectors == 0 { + return int(maxFetchedDataDurationPerQuery / baseInterval) // The total duration fetched does not increase with number of splits, return default max splits } var maxSplitsByDurationFetched int if maxFetchedDataDurationPerQuery > 0 { - // Duration fetched by query after splitting = fixedDurationFetched + perSplitDurationFetched x numOfShards + // Duration fetched by query after splitting = durationFetchedByRange + durationFetchedBySelectors x numOfShards // Rearranging the equation to find the max horizontal splits after accounting for vertical shards - maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - fixedDurationFetched) / perSplitDurationFetched) + maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - durationFetchedByRange) / durationFetchedBySelectors) } if maxSplitsByDurationFetched <= 0 { maxSplitsByDurationFetched = 1 @@ -281,57 +282,25 @@ func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, return maxSplitsByDurationFetched } -// Return the fixed base duration fetched by the query regardless of the number of splits, and the duration that is fetched once for every split -func getDurationFetchedByQuerySplitting(expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) (fixedDurationFetched time.Duration, perSplitDurationFetched time.Duration) { - // First analyze the query using original start-end time. Duration fetched by lookbackDelta here only reflects the start time of first split - durationFetchedByRange, durationFetchedBySelectors, durationFetchedByLookbackDeltaFirstSplit := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) - - fixedDurationFetched += durationFetchedByRange // Duration fetched by the query range is constant regardless of how many splits the query has - perSplitDurationFetched += durationFetchedBySelectors // Duration fetched by selectors is fetched once for every query split - - // Next analyze the query using the next split start time to find the duration fetched by lookbackDelta for splits other than first one - nextIntervalStart := nextIntervalBoundary(queryStart, queryStep, baseInterval) + queryStep - _, _, durationFetchedByLookbackDeltaOtherSplits := analyzeDurationFetchedByQueryExpr(expr, nextIntervalStart, queryEnd, baseInterval, lookbackDelta) - - // Handle different cases for lookbackDelta - if durationFetchedByLookbackDeltaFirstSplit > 0 && durationFetchedByLookbackDeltaOtherSplits > 0 { - // lookbackDelta is fetching additional duration for all splits - perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits - } else if durationFetchedByLookbackDeltaOtherSplits > 0 { - // lookbackDelta is fetching additional duration for all splits except first one - perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits - fixedDurationFetched -= durationFetchedByLookbackDeltaOtherSplits - } else if durationFetchedByLookbackDeltaFirstSplit > 0 { - // lookbackDelta is fetching additional duration for first split only - fixedDurationFetched += durationFetchedByLookbackDeltaFirstSplit - } - - return fixedDurationFetched, perSplitDurationFetched -} - // analyzeDurationFetchedByQueryExpr analyzes the query to calculate -// the duration of data that will be fetched from storage by different -// parts of the query +// the estimated duration of data that will be fetched from storage by +// different parts of the query // // Returns: // - durationFetchedByRange: The total duration fetched by the original start-end // range of the query. -// - durationFetchedBySelectors: The duration fetched by matrix selectors -// and/or subqueries. This duration will be fetched once by every query split. -// - durationFetchedByLookbackDelta: The duration fetched by lookbackDelta -// for the specified query start time. +// - durationFetchedBySelectors: The duration fetched by matrix selectors, +// subquery selectors, and lookbackDelta. // // Example: // Query up[15d:1h] with a range of 30 days, 1 day base split interval, and 5 min lookbackDelta with 00:00 UTC start time // - durationFetchedByRange = 30 day -// - durationFetchedBySelectors = 15 day -// - durationFetchedByLookbackDelta = 1 day -func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration, durationFetchedByLookbackDelta time.Duration) { - durationFetchedByRangeCount := 0 - durationFetchedByLookbackDeltaCount := 0 +// - durationFetchedBySelectors = 16 day +func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration) { baseIntervalMillis := util.DurationMilliseconds(baseInterval) + durationFetchedByRangeCount := 0 + durationFetchedBySelectorsCount := 0 - totalDurationFetchedCount := 0 var evalRange time.Duration parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { @@ -341,18 +310,11 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query queryEndIntervalIndex := floorDiv(queryEnd, baseIntervalMillis) durationFetchedByRangeCount += int(queryEndIntervalIndex-queryStartIntervalIndex) + 1 - // Adjust start and end time based on matrix selectors and/or subquery selector and increment total duration fetched, this excludes lookbackDelta - start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, 0, n, path, evalRange) - startIntervalIndex := floorDiv(start, baseIntervalMillis) - endIntervalIndex := floorDiv(end, baseIntervalMillis) - totalDurationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1 - - // Increment duration fetched by lookbackDelta - startLookbackDelta := start - util.DurationMilliseconds(lookbackDelta) - startLookbackDeltaIntervalIndex := floorDiv(startLookbackDelta, baseIntervalMillis) - if evalRange == 0 && startLookbackDeltaIntervalIndex < startIntervalIndex { - durationFetchedByLookbackDeltaCount += int(startIntervalIndex - startLookbackDeltaIntervalIndex) - } + // Adjust start time based on matrix selectors and/or subquery selectors and calculate additional lookback duration fetched + start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, lookbackDelta, n, path, evalRange) + durationFetchedBySelectors := (end - start) - (queryEnd - queryStart) + durationFetchedBySelectorsCount += int(ceilDiv(durationFetchedBySelectors, baseIntervalMillis)) + evalRange = 0 case *parser.MatrixSelector: evalRange = n.Range @@ -360,8 +322,7 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query return nil }) - durationFetchedBySelectorsCount := totalDurationFetchedCount - durationFetchedByRangeCount - return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval, time.Duration(durationFetchedByLookbackDeltaCount) * baseInterval + return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval } func floorDiv(a, b int64) int64 { @@ -371,7 +332,7 @@ func floorDiv(a, b int64) int64 { return a / b } -func ceilDiv(a, b time.Duration) time.Duration { +func ceilDiv(a, b int64) int64 { if a > 0 && a%b != 0 { return a/b + 1 } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 3e8d59f7868..279bcc546eb 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -704,7 +704,7 @@ func Test_getMaxSplitsByDurationFetched(t *testing.T) { }, { baseSplitInterval: day, - name: "20 days range with 5 min lookback but first split starts at 30 min, expect 181 max splits", + name: "20 days range with 5 min lookback but first split starts at 30 min, expect 180 max splits", req: &tripperware.PrometheusRequest{ Query: "up", Start: (30 * 60 * seconds), @@ -714,7 +714,7 @@ func Test_getMaxSplitsByDurationFetched(t *testing.T) { verticalShardSize: 1, lookbackDelta: 5 * time.Minute, maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 181, + expectedMaxSplits: 180, }, { baseSplitInterval: day, @@ -730,22 +730,6 @@ func Test_getMaxSplitsByDurationFetched(t *testing.T) { maxDurationOfDataFetched: 200 * day, expectedMaxSplits: 90, }, - { - baseSplitInterval: day, - // Certain step sizes can cause the start time of every split except the first one to shift forward. In these cases, - // lookbackDelta might not fetch additional days for all splits (except the first one if it starts at 00:00 UTC) - name: "20 days range with 1 day subquery and 5 min lookback but splits are step aligned forward, expect 179 max splits", - req: &tripperware.PrometheusRequest{ - Query: "up[1d:1h]", - Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, - Step: 22 * 60 * seconds, - }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 179, - }, { baseSplitInterval: day, name: "20 days range with 3 day matrix selector, expect 60 max splits", @@ -988,13 +972,12 @@ func Test_getIntervalFromMaxSplits(t *testing.T) { func Test_analyzeDurationFetchedByQuery(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - lookbackDelta time.Duration - req tripperware.Request - expectedDurationFetchedByRange time.Duration - expectedDurationFetchedBySelectors time.Duration - expectedDurationFetchedByLookbackDelta time.Duration + name string + baseSplitInterval time.Duration + lookbackDelta time.Duration + req tripperware.Request + expectedDurationFetchedByRange time.Duration + expectedDurationFetchedBySelectors time.Duration }{ { name: "query range 00:00 to 23:59", @@ -1006,9 +989,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 0 * time.Hour, }, { name: "query range 00:00 to 00:00 next day", @@ -1020,9 +1002,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 25 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 25 * time.Hour, + expectedDurationFetchedBySelectors: 0 * time.Hour, }, { name: "query range 00:00 to 23:59, with 5 min lookback", @@ -1034,9 +1015,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 1 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 1 * time.Hour, }, { name: "query range 00:00 to 23:59, with 5 hour subquery", @@ -1048,9 +1028,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up[5h:10m]", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 5 * time.Hour, - expectedDurationFetchedByLookbackDelta: 1 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 6 * time.Hour, }, { name: "query range 00:00 to 23:59, with 2 hour matrix selector", @@ -1062,9 +1041,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "rate(up[2h])", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 2 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 2 * time.Hour, }, { name: "query range 00:00 to 23:59, with multiple matrix selectors", @@ -1076,9 +1054,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "rate(up[2h]) + rate(up[5h]) + rate(up[7h])", }, - expectedDurationFetchedByRange: 72 * time.Hour, - expectedDurationFetchedBySelectors: 14 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 72 * time.Hour, + expectedDurationFetchedBySelectors: 14 * time.Hour, }, { name: "query range 60 day with 20 day subquery", @@ -1090,9 +1067,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[20d:1h]", }, - expectedDurationFetchedByRange: 61 * day, - expectedDurationFetchedBySelectors: 20 * day, - expectedDurationFetchedByLookbackDelta: 1 * day, + expectedDurationFetchedByRange: 61 * day, + expectedDurationFetchedBySelectors: 21 * day, }, { name: "query range 35 day, with 15 day subquery and 1 week offset", @@ -1104,9 +1080,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[15d:1h] offset 1w", }, - expectedDurationFetchedByRange: 36 * day, - expectedDurationFetchedBySelectors: 15 * day, - expectedDurationFetchedByLookbackDelta: 1 * day, + expectedDurationFetchedByRange: 36 * day, + expectedDurationFetchedBySelectors: 16 * day, }, { name: "query range 10 days, with multiple subqueries and offsets", @@ -1118,9 +1093,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "rate(up[2d:1h] offset 1w) + rate(up[5d:1h] offset 2w) + rate(up[7d:1h] offset 3w)", }, - expectedDurationFetchedByRange: 33 * day, - expectedDurationFetchedBySelectors: 14 * day, - expectedDurationFetchedByLookbackDelta: 3 * day, + expectedDurationFetchedByRange: 33 * day, + expectedDurationFetchedBySelectors: 17 * day, }, { name: "query range spans 40 days with 4 day matrix selector", @@ -1132,18 +1106,16 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[4d]", }, - expectedDurationFetchedByRange: 40 * day, - expectedDurationFetchedBySelectors: 4 * day, - expectedDurationFetchedByLookbackDelta: 0 * day, + expectedDurationFetchedByRange: 40 * day, + expectedDurationFetchedBySelectors: 4 * day, }, } { t.Run(tc.name, func(t *testing.T) { expr, err := parser.ParseExpr(tc.req.GetQuery()) require.Nil(t, err) - queryRangeIntervals, extraIntervalsPerSplit, lookbackDeltaIntervals := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) - require.Equal(t, tc.expectedDurationFetchedByRange, queryRangeIntervals) - require.Equal(t, tc.expectedDurationFetchedBySelectors, extraIntervalsPerSplit) - require.Equal(t, tc.expectedDurationFetchedByLookbackDelta, lookbackDeltaIntervals) + durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) + require.Equal(t, tc.expectedDurationFetchedByRange, durationFetchedByRange) + require.Equal(t, tc.expectedDurationFetchedBySelectors, durationFetchedBySelectors) }) } } From a98c452317ee6af632064eeea17f8b304b746767 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Thu, 3 Apr 2025 16:20:37 -0700 Subject: [PATCH 2/7] add query dynamic vertical sharding Signed-off-by: Ahmed Hassan --- pkg/querier/stats/stats.go | 11 ++++ .../queryrange/split_by_interval.go | 61 +++++++++++++------ pkg/querier/tripperware/shard_by.go | 5 ++ 3 files changed, 59 insertions(+), 18 deletions(-) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 127c422878a..1b0be55109f 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -140,6 +140,17 @@ func (s *QueryStats) LoadExtraFields() []interface{} { return r } +func (s *QueryStats) GetExtraField(fieldName string) string { + fields := s.LoadExtraFields() + for i := 0; i < len(fields); i += 2 { + key, ok := fields[i].(string) + if ok && key == fieldName { + return fmt.Sprintf("%v", fields[i+1]) + } + } + return "" +} + func (s *QueryStats) LoadFetchedSeries() uint64 { if s == nil { return 0 diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index ec7a9d247cf..c4cb2ace331 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -167,7 +167,7 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer return baseInterval, nil } - queryVerticalShardSize, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer) + maxVerticalShardSize, isShardable, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer) if err != nil { return baseInterval, err } @@ -177,41 +177,60 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer return baseInterval, err } - maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, queryVerticalShardSize) - maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, queryVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) - - // Use the more restrictive max splits limit - var maxSplits int - switch { - case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: - maxSplits = min(maxSplitsPerQuery, maxSplitsFromDurationFetched) - case dynamicSplitCfg.MaxShardsPerQuery > 0: - maxSplits = maxSplitsPerQuery - case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: - maxSplits = maxSplitsFromDurationFetched + interval := baseInterval + verticalShardSize := 1 + totalShards := 0 + // Find the combination of horizontal splits and vertical shards that will result in the largest total number of shards + for currentVerticalShardSize := 1; currentVerticalShardSize <= maxVerticalShardSize; currentVerticalShardSize++ { + maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, currentVerticalShardSize) + maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, currentVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) + + // Use the more restrictive max splits limit + var maxSplits int + switch { + case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: + maxSplits = min(maxSplitsPerQuery, maxSplitsFromDurationFetched) + case dynamicSplitCfg.MaxShardsPerQuery > 0: + maxSplits = maxSplitsPerQuery + case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: + maxSplits = maxSplitsFromDurationFetched + } + + currentInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) + currentTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), currentInterval, currentVerticalShardSize) + if totalShards <= currentTotalShards { + verticalShardSize = currentVerticalShardSize + interval = currentInterval + totalShards = currentTotalShards + } + } + + // Set number of vertical shards to be used by shard_by middleware + if isShardable && maxVerticalShardSize > 1 { + stats := querier_stats.FromContext(ctx) + stats.AddExtraFields("shard_by.num_shards", verticalShardSize) } - interval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) return interval, nil } } -func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, error) { +func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, bool, error) { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { - return 1, err + return 1, false, err } analysis, err := queryAnalyzer.Analyze(r.GetQuery()) if err != nil { - return 1, err + return 1, false, err } queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) if queryVerticalShardSize <= 0 || !analysis.IsShardable() { queryVerticalShardSize = 1 } - return queryVerticalShardSize, nil + return queryVerticalShardSize, analysis.IsShardable(), nil } // Returns the minimum multiple of base interval needed to split query into less than maxSplits @@ -325,6 +344,12 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval } +func getExpectedTotalShards(queryStart int64, queryEnd int64, interval time.Duration, verticalShardSize int) int { + queryRange := time.Duration((queryEnd - queryStart) * int64(time.Millisecond)) + expectedSplits := int(ceilDiv(int64(queryRange), int64(interval))) + return expectedSplits * verticalShardSize +} + func floorDiv(a, b int64) int64 { if a < 0 && a%b != 0 { return a/b - 1 diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index fe6293832f1..61b9b3cbf9d 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -3,6 +3,7 @@ package tripperware import ( "context" "net/http" + "strconv" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -46,6 +47,10 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { } numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, s.limits.QueryVerticalShardSize) + // Check if vertical shard size is set by dynamic query splitting + if val, err := strconv.Atoi(stats.GetExtraField("shard_by.num_shards")); err == nil && val > 0 { + numShards = val + } if numShards <= 1 { return s.next.Do(ctx, r) From 2ea7f0b9c9d8cad67eb0cb3135f78e1b2d6b4e91 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Thu, 3 Apr 2025 17:46:37 -0700 Subject: [PATCH 3/7] fix unit tests for dynamic vertical sharding Signed-off-by: Ahmed Hassan --- .../queryrange/split_by_interval_test.go | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 279bcc546eb..f575988e3c7 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -19,6 +19,7 @@ import ( "github.com/weaveworks/common/user" "go.uber.org/atomic" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -440,14 +441,15 @@ func Test_evaluateAtModifier(t *testing.T) { func Test_dynamicIntervalFn(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - req tripperware.Request - expectedInterval time.Duration - expectedError bool - verticalShardSize int - maxIntervalSplits int - maxDurationOfDataFetched time.Duration + name string + baseSplitInterval time.Duration + req tripperware.Request + maxVerticalShardSize int + maxIntervalSplits int + maxDurationOfDataFetched time.Duration + expectedInterval time.Duration + expectedVerticalShardSize int + expectedError bool }{ { baseSplitInterval: day, @@ -549,16 +551,17 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "60 day range with 15 max splits and 3 vertical shard size, expect split by 12 days", + name: "60 day range with 15 max splits and max 3 vertical shard size, expect split by 12 days and 3 vertical shards", req: &tripperware.PrometheusRequest{ Start: 0, End: 60 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, Query: "sum by (pod) (up)", }, - verticalShardSize: 3, - maxIntervalSplits: 15, - expectedInterval: 12 * day, + maxVerticalShardSize: 3, + maxIntervalSplits: 15, + expectedInterval: 12 * day, + expectedVerticalShardSize: 3, }, { baseSplitInterval: time.Hour, @@ -610,16 +613,17 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "30 day range with multiple matrix selectors and 3 vertical shards, expect split by 6 days", + name: "30 day range with multiple matrix selectors and max 3 vertical shards, expect split by 1 day and 1 vertical shard", req: &tripperware.PrometheusRequest{ Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), Step: 5 * 60 * seconds, Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 350 * day, - expectedInterval: 6 * day, + maxVerticalShardSize: 3, + maxDurationOfDataFetched: 350 * day, + expectedInterval: day, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, @@ -656,8 +660,16 @@ func Test_dynamicIntervalFn(t *testing.T) { }, } ctx := user.InjectOrgID(context.Background(), "1") - interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.verticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + stats, ctx := querier_stats.ContextWithEmptyStats(ctx) + + interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) + + if tc.expectedVerticalShardSize > 0 { + verticalShardSize, err := strconv.Atoi(stats.GetExtraField("shard_by.num_shards")) + require.Nil(t, err) + require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) + } if !tc.expectedError { require.Nil(t, err) } From 447f6d392aeb9dc85476675a696e529ea5866812 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Wed, 9 Apr 2025 18:33:20 -0700 Subject: [PATCH 4/7] store dynamic vertical shard size in context Signed-off-by: Ahmed Hassan --- .../queryrange/split_by_interval.go | 25 +++++++------ .../queryrange/split_by_interval_test.go | 10 ++++-- pkg/querier/tripperware/shard_by.go | 36 +++++++++++++------ 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index c4cb2ace331..ad67379f0c1 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -18,7 +18,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error) +type IntervalFn func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) // SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval. func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, lookbackDelta time.Duration) tripperware.Middleware { @@ -52,7 +52,7 @@ type splitByInterval struct { func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) { // First we're going to build new requests, one for each day, taking care // to line up the boundaries with step. - interval, err := s.interval(ctx, r) + ctx, interval, err := s.interval(ctx, r) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } @@ -152,29 +152,29 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 { } // Returns a fixed split interval -func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { - return func(_ context.Context, _ tripperware.Request) (time.Duration, error) { - return cfg.SplitQueriesByInterval, nil +func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { + return func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, cfg.SplitQueriesByInterval, nil } } // Returns a dynamic multiple of base interval adjusted depending on configured 'max_shards_per_query' and 'max_fetched_data_duration_per_query' -func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { - return func(ctx context.Context, r tripperware.Request) (time.Duration, error) { +func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { + return func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { baseInterval := cfg.SplitQueriesByInterval dynamicSplitCfg := cfg.DynamicQuerySplitsConfig if dynamicSplitCfg.MaxShardsPerQuery == 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery == 0 { - return baseInterval, nil + return ctx, baseInterval, nil } maxVerticalShardSize, isShardable, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer) if err != nil { - return baseInterval, err + return ctx, baseInterval, err } queryExpr, err := parser.ParseExpr(r.GetQuery()) if err != nil { - return baseInterval, err + return ctx, baseInterval, err } interval := baseInterval @@ -207,11 +207,10 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer // Set number of vertical shards to be used by shard_by middleware if isShardable && maxVerticalShardSize > 1 { - stats := querier_stats.FromContext(ctx) - stats.AddExtraFields("shard_by.num_shards", verticalShardSize) + ctx = tripperware.SetVerticalShardSizeToContext(ctx, verticalShardSize) } - return interval, nil + return ctx, interval, nil } } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index f575988e3c7..8aca5f2d3c8 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -298,7 +298,9 @@ func TestSplitByDay(t *testing.T) { path: query, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 2, - intervalFn: func(_ context.Context, _ tripperware.Request) (time.Duration, error) { return 24 * time.Hour, nil }, + intervalFn: func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, 24 * time.Hour, nil + }, }, { path: query, @@ -310,7 +312,9 @@ func TestSplitByDay(t *testing.T) { path: longQuery, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 31, - intervalFn: func(_ context.Context, _ tripperware.Request) (time.Duration, error) { return day, nil }, + intervalFn: func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, day, nil + }, }, { path: longQuery, @@ -662,7 +666,7 @@ func Test_dynamicIntervalFn(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") stats, ctx := querier_stats.ContextWithEmptyStats(ctx) - interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) if tc.expectedVerticalShardSize > 0 { diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index 61b9b3cbf9d..912a1efcb75 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -3,7 +3,6 @@ package tripperware import ( "context" "net/http" - "strconv" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -46,13 +45,13 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } - numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, s.limits.QueryVerticalShardSize) + verticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, s.limits.QueryVerticalShardSize) // Check if vertical shard size is set by dynamic query splitting - if val, err := strconv.Atoi(stats.GetExtraField("shard_by.num_shards")); err == nil && val > 0 { - numShards = val + if dynamicVerticalShardSize, ok := VerticalShardSizeFromContext(ctx); ok { + verticalShardSize = dynamicVerticalShardSize } - if numShards <= 1 { + if verticalShardSize <= 1 { return s.next.Do(ctx, r) } @@ -65,7 +64,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { stats.AddExtraFields( "shard_by.is_shardable", analysis.IsShardable(), - "shard_by.num_shards", numShards, + "shard_by.num_shards", verticalShardSize, "shard_by.sharding_labels", analysis.ShardingLabels(), ) @@ -73,7 +72,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return s.next.Do(ctx, r) } - reqs := s.shardQuery(logger, numShards, r, analysis) + reqs := s.shardQuery(logger, verticalShardSize, r, analysis) reqResps, err := DoRequests(ctx, s.next, reqs, s.limits) if err != nil { @@ -88,11 +87,11 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return s.merger.MergeResponse(ctx, r, resps...) } -func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request { - reqs := make([]Request, numShards) - for i := 0; i < numShards; i++ { +func (s shardBy) shardQuery(l log.Logger, verticalShardSize int, r Request, analysis querysharding.QueryAnalysis) []Request { + reqs := make([]Request, verticalShardSize) + for i := 0; i < verticalShardSize; i++ { q, err := cquerysharding.InjectShardingInfo(r.GetQuery(), &storepb.ShardInfo{ - TotalShards: int64(numShards), + TotalShards: int64(verticalShardSize), ShardIndex: int64(i), By: analysis.ShardBy(), Labels: analysis.ShardingLabels(), @@ -107,3 +106,18 @@ func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis que return reqs } + +type verticalShardsKey struct{} + +func VerticalShardSizeFromContext(ctx context.Context) (int, bool) { + val := ctx.Value(verticalShardsKey{}) + if val == nil { + return 1, false + } + verticalShardSize, ok := val.(int) + return verticalShardSize, ok +} + +func SetVerticalShardSizeToContext(ctx context.Context, verticalShardSize int) context.Context { + return context.WithValue(ctx, verticalShardsKey{}, verticalShardSize) +} From df26e9ab205d201f97a29c8de558f8c488bf5079 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Wed, 9 Apr 2025 18:47:38 -0700 Subject: [PATCH 5/7] refactoring dynamicIntervalFn Signed-off-by: Ahmed Hassan --- .../queryrange/split_by_interval.go | 26 +++++++------------ .../queryrange/split_by_interval_test.go | 9 +++---- 2 files changed, 12 insertions(+), 23 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index ad67379f0c1..f26462344a0 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -181,9 +181,9 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer verticalShardSize := 1 totalShards := 0 // Find the combination of horizontal splits and vertical shards that will result in the largest total number of shards - for currentVerticalShardSize := 1; currentVerticalShardSize <= maxVerticalShardSize; currentVerticalShardSize++ { - maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, currentVerticalShardSize) - maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, currentVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) + for candidateVerticalShardSize := 1; candidateVerticalShardSize <= maxVerticalShardSize; candidateVerticalShardSize++ { + maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize) + maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) // Use the more restrictive max splits limit var maxSplits int @@ -196,12 +196,11 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer maxSplits = maxSplitsFromDurationFetched } - currentInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) - currentTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), currentInterval, currentVerticalShardSize) - if totalShards <= currentTotalShards { - verticalShardSize = currentVerticalShardSize - interval = currentInterval - totalShards = currentTotalShards + candidateInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) + if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); totalShards <= candidateTotalShards { + interval = candidateInterval + verticalShardSize = candidateVerticalShardSize + totalShards = candidateTotalShards } } @@ -301,14 +300,7 @@ func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, } // analyzeDurationFetchedByQueryExpr analyzes the query to calculate -// the estimated duration of data that will be fetched from storage by -// different parts of the query -// -// Returns: -// - durationFetchedByRange: The total duration fetched by the original start-end -// range of the query. -// - durationFetchedBySelectors: The duration fetched by matrix selectors, -// subquery selectors, and lookbackDelta. +// the estimated duration of data that will be fetched from storage // // Example: // Query up[15d:1h] with a range of 30 days, 1 day base split interval, and 5 min lookbackDelta with 00:00 UTC start time diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 8aca5f2d3c8..9afa3e5be24 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -19,7 +19,6 @@ import ( "github.com/weaveworks/common/user" "go.uber.org/atomic" - querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -664,15 +663,13 @@ func Test_dynamicIntervalFn(t *testing.T) { }, } ctx := user.InjectOrgID(context.Background(), "1") - stats, ctx := querier_stats.ContextWithEmptyStats(ctx) - ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) if tc.expectedVerticalShardSize > 0 { - verticalShardSize, err := strconv.Atoi(stats.GetExtraField("shard_by.num_shards")) - require.Nil(t, err) - require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) + if verticalShardSize, ok := tripperware.VerticalShardSizeFromContext(ctx); ok { + require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) + } } if !tc.expectedError { require.Nil(t, err) From a97c205d943b0b2039efb22ecaf0b0097b1ad405 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Wed, 16 Apr 2025 18:16:29 -0700 Subject: [PATCH 6/7] add separate vertical sharding tests Signed-off-by: Ahmed Hassan --- pkg/querier/stats/stats.go | 11 - .../queryrange/split_by_interval.go | 6 +- .../queryrange/split_by_interval_test.go | 376 ++++++++---------- pkg/querier/tripperware/shard_by.go | 2 +- 4 files changed, 171 insertions(+), 224 deletions(-) diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 1b0be55109f..127c422878a 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -140,17 +140,6 @@ func (s *QueryStats) LoadExtraFields() []interface{} { return r } -func (s *QueryStats) GetExtraField(fieldName string) string { - fields := s.LoadExtraFields() - for i := 0; i < len(fields); i += 2 { - key, ok := fields[i].(string) - if ok && key == fieldName { - return fmt.Sprintf("%v", fields[i+1]) - } - } - return "" -} - func (s *QueryStats) LoadFetchedSeries() uint64 { if s == nil { return 0 diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index f26462344a0..e256360b832 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -197,16 +197,16 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer } candidateInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) - if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); totalShards <= candidateTotalShards { + if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); candidateTotalShards > totalShards { interval = candidateInterval verticalShardSize = candidateVerticalShardSize totalShards = candidateTotalShards } } - // Set number of vertical shards to be used by shard_by middleware + // Set number of vertical shards to be used in shard_by middleware if isShardable && maxVerticalShardSize > 1 { - ctx = tripperware.SetVerticalShardSizeToContext(ctx, verticalShardSize) + ctx = tripperware.InjectVerticalShardSizeToContext(ctx, verticalShardSize) } return ctx, interval, nil diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 9afa3e5be24..935dd40ce26 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -444,15 +444,13 @@ func Test_evaluateAtModifier(t *testing.T) { func Test_dynamicIntervalFn(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - req tripperware.Request - maxVerticalShardSize int - maxIntervalSplits int - maxDurationOfDataFetched time.Duration - expectedInterval time.Duration - expectedVerticalShardSize int - expectedError bool + name string + baseSplitInterval time.Duration + req tripperware.Request + maxShardsPerQuery int + maxFetchedDataDuration time.Duration + expectedInterval time.Duration + expectedError bool }{ { baseSplitInterval: day, @@ -463,47 +461,23 @@ func Test_dynamicIntervalFn(t *testing.T) { End: 10 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, }, - maxIntervalSplits: 30, - maxDurationOfDataFetched: 200 * day, - expectedInterval: day, - expectedError: true, + maxShardsPerQuery: 30, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, + expectedError: true, }, { baseSplitInterval: day, - name: "23 hour range with 10 max splits, expect split by 1 day", + name: "23 hour range with 30 max splits, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 0, End: 23 * 3600 * seconds, Step: 60 * seconds, Query: "up", }, - maxIntervalSplits: 10, + maxShardsPerQuery: 30, expectedInterval: day, }, - { - baseSplitInterval: time.Hour, - name: "24 hour range with 100 max splits, expect split by 1 hour", - req: &tripperware.PrometheusRequest{ - Start: 0, - End: 24 * 3600 * seconds, - Step: 60 * seconds, - Query: "up", - }, - maxIntervalSplits: 100, - expectedInterval: time.Hour, - }, - { - baseSplitInterval: time.Hour, - name: "120 hour range with 15 max splits, expect split by 8 hours", - req: &tripperware.PrometheusRequest{ - Start: 0, - End: 5 * 24 * 3600 * seconds, - Step: 60 * seconds, - Query: "up", - }, - maxIntervalSplits: 15, - expectedInterval: 8 * time.Hour, - }, { baseSplitInterval: day, name: "30 day range with 30 max splits, expect split by 1 day", @@ -513,164 +487,155 @@ func Test_dynamicIntervalFn(t *testing.T) { Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 30, + maxShardsPerQuery: 30, expectedInterval: day, }, { baseSplitInterval: day, - name: "30 day range with 20 max splits, expect split by 2 day", + name: "30 day range with 10 max splits, expect split by 3 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 30 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 20, - expectedInterval: 2 * day, + maxShardsPerQuery: 10, + expectedInterval: 3 * day, }, { baseSplitInterval: day, - name: "60 day range with 15 max splits, expect split by 4 day", + name: "31 day range with 10 max splits, expect split by 4 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 60 * 24 * 3600 * seconds, + End: 31 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 15, + maxShardsPerQuery: 10, expectedInterval: 4 * day, }, { baseSplitInterval: day, - name: "61 day range with 15 max splits, expect split by 5 day", + name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day", req: &tripperware.PrometheusRequest{ - Start: 0, - End: 61 * 24 * 3600 * seconds, + Start: 30 * 24 * 3600 * seconds, + End: 60 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "up", + Query: "avg_over_time(up[1h])", }, - maxIntervalSplits: 15, - expectedInterval: 5 * day, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, }, { baseSplitInterval: day, - name: "60 day range with 15 max splits and max 3 vertical shard size, expect split by 12 days and 3 vertical shards", + name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days", req: &tripperware.PrometheusRequest{ - Start: 0, + Start: 30 * 24 * 3600 * seconds, End: 60 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "sum by (pod) (up)", + Query: "avg_over_time(up[20d])", }, - maxVerticalShardSize: 3, - maxIntervalSplits: 15, - expectedInterval: 12 * day, - expectedVerticalShardSize: 3, + maxFetchedDataDuration: 200 * day, + expectedInterval: 4 * day, }, { - baseSplitInterval: time.Hour, - name: "101 hours with 200 hour max duration fetched and 5m matrix selector, expect split by 2 hour", + baseSplitInterval: day, + name: "roughly 30 day range with multiple matrix selectors and 200 days max duration fetched, expect split by 3 days", req: &tripperware.PrometheusRequest{ - Start: (3 * 24 * 3600 * seconds) - (2 * 3600 * seconds), - End: (7 * 24 * 3600 * seconds) + (3 * 3600 * seconds) - 1, - Step: 60 * seconds, - Query: "up[5m]", + Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), + End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Step: 5 * 60 * seconds, + Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", }, - maxDurationOfDataFetched: 200 * time.Hour, - expectedInterval: 2 * time.Hour, + maxFetchedDataDuration: 200 * day, + expectedInterval: 3 * day, }, { baseSplitInterval: day, - name: "30 day range with 200 days max duration fetched and 1h matrix selector, expect split by 1 day", + name: "50 day range with 5 day subquery and 100 days max duration fetched, expect split by 7 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 50 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "avg_over_time(up[1h])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: day, + maxFetchedDataDuration: 100 * day, + expectedInterval: 7 * day, }, { baseSplitInterval: day, - name: "30 day range with 200 days max duration fetched and 20d matrix selector, expect split by 4 days", + name: "50 day range with 5 day subquery and 100 days max duration fetched and 5 max splits, expect split by 10 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 50 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "avg_over_time(up[20d])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: 4 * day, + maxShardsPerQuery: 5, + maxFetchedDataDuration: 100 * day, + expectedInterval: 10 * day, }, { baseSplitInterval: day, - name: "30 day range with multiple matrix selectors, expect split by 3 days", + name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Start: 0, + End: 100 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: 3 * day, + maxFetchedDataDuration: 100 * day, + expectedInterval: 100 * day, }, { - baseSplitInterval: day, - name: "30 day range with multiple matrix selectors and max 3 vertical shards, expect split by 1 day and 1 vertical shard", + baseSplitInterval: time.Hour, + name: "100 hour range with 100 max splits, expect split by 1 hour", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), - Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Start: 0, + End: 100 * 3600 * seconds, + Step: 60 * seconds, + Query: "up", }, - maxVerticalShardSize: 3, - maxDurationOfDataFetched: 350 * day, - expectedInterval: day, - expectedVerticalShardSize: 1, + maxShardsPerQuery: 100, + expectedInterval: time.Hour, }, { - baseSplitInterval: day, - name: "100 day range with 5 day subquery, expect split by 13 days", + baseSplitInterval: time.Hour, + name: "100 hour range with 20 max splits, expect split by 5 hours", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 24 * 3600 * seconds, - Step: 60 * 60 * seconds, - Query: "up[5d:10m]", + End: 100 * 3600 * seconds, + Step: 60 * seconds, + Query: "up", }, - maxIntervalSplits: 100, - maxDurationOfDataFetched: 150 * day, - expectedInterval: 13 * day, + maxShardsPerQuery: 20, + expectedInterval: 5 * time.Hour, }, { - baseSplitInterval: day, - name: "51 day range, longer than max duration fetched, expect split by 51 day", + baseSplitInterval: time.Hour, + name: "100 hours with 200 hour max duration fetched and 5h matrix selector, expect split by 5 hours", req: &tripperware.PrometheusRequest{ - Start: 0, - End: (51 * 24 * 3600 * seconds), - Step: 5 * 60 * seconds, - Query: "up[5d]", + Start: 100 * 3600 * seconds, + End: 200 * 3600 * seconds, + Step: 60 * seconds, + Query: "up[5h]", }, - maxDurationOfDataFetched: 50 * day, - expectedInterval: 51 * day, + maxFetchedDataDuration: 200 * time.Hour, + expectedInterval: 5 * time.Hour, }, } { t.Run(tc.name, func(t *testing.T) { cfg := Config{ SplitQueriesByInterval: tc.baseSplitInterval, DynamicQuerySplitsConfig: DynamicQuerySplitsConfig{ - MaxShardsPerQuery: tc.maxIntervalSplits, - MaxFetchedDataDurationPerQuery: tc.maxDurationOfDataFetched, + MaxShardsPerQuery: tc.maxShardsPerQuery, + MaxFetchedDataDurationPerQuery: tc.maxFetchedDataDuration, }, } ctx := user.InjectOrgID(context.Background(), "1") - ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) - if tc.expectedVerticalShardSize > 0 { - if verticalShardSize, ok := tripperware.VerticalShardSizeFromContext(ctx); ok { - require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) - } - } if !tc.expectedError { require.Nil(t, err) } @@ -678,144 +643,137 @@ func Test_dynamicIntervalFn(t *testing.T) { } } -func Test_getMaxSplitsByDurationFetched(t *testing.T) { +func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - req tripperware.Request - verticalShardSize int - lookbackDelta time.Duration - maxDurationOfDataFetched time.Duration - expectedMaxSplits int + name string + baseSplitInterval time.Duration + req tripperware.Request + maxVerticalShardSize int + maxShardsPerQuery int + maxFetchedDataDuration time.Duration + expectedInterval time.Duration + expectedVerticalShardSize int + expectedError bool }{ { baseSplitInterval: day, - name: "20 day range with no lookback, expect default max splits", + name: "23 hour range with 30 max splits, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, - Step: 5 * 60 * seconds, + End: 23 * 3600 * seconds, + Step: 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 200, + maxShardsPerQuery: 30, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, { baseSplitInterval: day, - name: "20 days range with 5 min lookback, expect 180 max splits", + name: "30 day range with 30 max splits, expect split by 1 days and 1 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, - Step: 5 * 60 * seconds, - }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 180, - }, - { - baseSplitInterval: day, - name: "20 days range with 5 min lookback but first split starts at 30 min, expect 180 max splits", - req: &tripperware.PrometheusRequest{ - Query: "up", - Start: (30 * 60 * seconds), - End: (20 * 24 * 3600 * seconds) - 1, + End: 30 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 180, + maxShardsPerQuery: 30, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - name: "20 days range with 1 day subquery and 5 min lookback, expect 90 max splits", + name: "30 day range with 10 max splits, expect split by 3 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ - Query: "up[1d:1h]", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, + End: 30 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 90, + maxShardsPerQuery: 10, + expectedInterval: 3 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - name: "20 days range with 3 day matrix selector, expect 60 max splits", + name: "31 day range with 10 max splits, expect split by 7 days and 2 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up[3d]", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, + End: 31 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 60, + maxShardsPerQuery: 10, + expectedInterval: 7 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 2, }, { baseSplitInterval: day, - name: "30 days range with 20 day subquery, expect 8 max splits", + name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: (60 * 24 * 3600 * seconds) - 1, - Step: 5 * 60 * seconds, - Query: "avg_over_time(up[20d:1h])", - }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 8, - }, - { - baseSplitInterval: day, - // Each term in the query fetches 20 days of range - name: "20 days range with multiple matrix selectors, expect 20 max splits", - req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + End: 60 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Query: "sum(rate(up[1h])) by (cluster)", }, - verticalShardSize: 1, - maxDurationOfDataFetched: 350 * day, - expectedMaxSplits: 20, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, { baseSplitInterval: day, - name: "20 days range with multiple matrix selectors and 3 vertical shard size, expect 4 max splits", + name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Start: 30 * 24 * 3600 * seconds, + End: 60 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Query: "sum(rate(up[20d])) by (cluster)", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 350 * day, - expectedMaxSplits: 4, + maxFetchedDataDuration: 200 * day, + expectedInterval: 4 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - name: "around 37 days range with 5 day matrix selector and 3 vertical shards, expect 25 max splits", + name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ - Start: (3 * 24 * 3600 * seconds) - (4*3600*seconds + 240*seconds), - End: (38 * 24 * 3600 * seconds) + (1*3600*seconds + 60*seconds), + Start: 0, + End: 100 * 24 * 3600 * seconds, Step: 5 * 60 * seconds, - Query: "up[5d]", + Query: "sum(rate(up[5d:10m])) by (cluster)", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 500 * day, - expectedMaxSplits: 25, + maxFetchedDataDuration: 100 * day, + expectedInterval: 100 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, } { t.Run(tc.name, func(t *testing.T) { - queryExpr, err := parser.ParseExpr(tc.req.GetQuery()) - require.Nil(t, err) - maxSplits := getMaxSplitsByDurationFetched(tc.maxDurationOfDataFetched, tc.verticalShardSize, queryExpr, tc.req.GetStart(), tc.req.GetEnd(), tc.req.GetStep(), tc.baseSplitInterval, tc.lookbackDelta) - require.Equal(t, tc.expectedMaxSplits, maxSplits) + cfg := Config{ + SplitQueriesByInterval: tc.baseSplitInterval, + DynamicQuerySplitsConfig: DynamicQuerySplitsConfig{ + MaxShardsPerQuery: tc.maxShardsPerQuery, + MaxFetchedDataDurationPerQuery: tc.maxFetchedDataDuration, + }, + } + ctx := user.InjectOrgID(context.Background(), "1") + ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + require.Equal(t, tc.expectedInterval, interval) + + if tc.expectedVerticalShardSize > 0 { + if verticalShardSize, ok := tripperware.VerticalShardSizeFromContext(ctx); ok { + require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) + } + } + if !tc.expectedError { + require.Nil(t, err) + } }) } } diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index 912a1efcb75..607e397f0e9 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -118,6 +118,6 @@ func VerticalShardSizeFromContext(ctx context.Context) (int, bool) { return verticalShardSize, ok } -func SetVerticalShardSizeToContext(ctx context.Context, verticalShardSize int) context.Context { +func InjectVerticalShardSizeToContext(ctx context.Context, verticalShardSize int) context.Context { return context.WithValue(ctx, verticalShardsKey{}, verticalShardSize) } From cda775183033c4a5279aa3b005a36de0809bc878 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Thu, 17 Apr 2025 13:36:10 -0700 Subject: [PATCH 7/7] fix formatting Signed-off-by: Ahmed Hassan --- .../queryrange/split_by_interval.go | 12 +- .../queryrange/split_by_interval_test.go | 115 ++++++------------ 2 files changed, 44 insertions(+), 83 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index e256360b832..fd5eeda5859 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -182,16 +182,16 @@ func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer quer totalShards := 0 // Find the combination of horizontal splits and vertical shards that will result in the largest total number of shards for candidateVerticalShardSize := 1; candidateVerticalShardSize <= maxVerticalShardSize; candidateVerticalShardSize++ { - maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize) - maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) + maxSplitsFromMaxShards := getMaxSplitsFromMaxQueryShards(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize) + maxSplitsFromDurationFetched := getMaxSplitsFromDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) // Use the more restrictive max splits limit var maxSplits int switch { case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: - maxSplits = min(maxSplitsPerQuery, maxSplitsFromDurationFetched) + maxSplits = min(maxSplitsFromMaxShards, maxSplitsFromDurationFetched) case dynamicSplitCfg.MaxShardsPerQuery > 0: - maxSplits = maxSplitsPerQuery + maxSplits = maxSplitsFromMaxShards case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: maxSplits = maxSplitsFromDurationFetched } @@ -268,7 +268,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, } // Return max allowed number of splits by MaxShardsPerQuery config after accounting for vertical sharding -func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int) int { +func getMaxSplitsFromMaxQueryShards(maxSplitsConfigValue int, queryVerticalShardSize int) int { var maxSplitsFromConfig int if maxSplitsConfigValue > 0 { maxSplitsFromConfig = maxSplitsConfigValue / queryVerticalShardSize @@ -280,7 +280,7 @@ func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int } // Return max allowed number of splits by MaxFetchedDataDurationPerQuery config after accounting for vertical sharding -func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int { +func getMaxSplitsFromDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int { durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) if durationFetchedBySelectors == 0 { diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 935dd40ce26..53f96dacb98 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -458,7 +458,7 @@ func Test_dynamicIntervalFn(t *testing.T) { req: &tripperware.PrometheusRequest{ Query: "up[aaa", Start: 0, - End: 10 * 24 * 3600 * seconds, + End: 10*24*3600*seconds - 1, Step: 5 * 60 * seconds, }, maxShardsPerQuery: 30, @@ -468,10 +468,10 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "23 hour range with 30 max splits, expect split by 1 day", + name: "23 hour range with 30 max shards, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 0, - End: 23 * 3600 * seconds, + End: 23*3600*seconds - 1, Step: 60 * seconds, Query: "up", }, @@ -480,10 +480,10 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "30 day range with 30 max splits, expect split by 1 day", + name: "30 day range with 30 max shards, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 0, - End: 30 * 24 * 3600 * seconds, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, @@ -492,10 +492,10 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "30 day range with 10 max splits, expect split by 3 days", + name: "30 day range with 10 max shards, expect split by 3 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 30 * 24 * 3600 * seconds, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, @@ -504,10 +504,10 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "31 day range with 10 max splits, expect split by 4 days", + name: "31 day range with 10 max shards, expect split by 4 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 31 * 24 * 3600 * seconds, + End: 31*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, @@ -519,7 +519,7 @@ func Test_dynamicIntervalFn(t *testing.T) { name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "avg_over_time(up[1h])", }, @@ -531,7 +531,7 @@ func Test_dynamicIntervalFn(t *testing.T) { name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "avg_over_time(up[20d])", }, @@ -555,7 +555,7 @@ func Test_dynamicIntervalFn(t *testing.T) { name: "50 day range with 5 day subquery and 100 days max duration fetched, expect split by 7 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 50 * 24 * 3600 * seconds, + End: 50*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up[5d:10m]", }, @@ -564,10 +564,10 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: day, - name: "50 day range with 5 day subquery and 100 days max duration fetched and 5 max splits, expect split by 10 days", + name: "50 day range with 5 day subquery and 100 days max duration fetched and 5 max shards, expect split by 10 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 50 * 24 * 3600 * seconds, + End: 50*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up[5d:10m]", }, @@ -580,7 +580,7 @@ func Test_dynamicIntervalFn(t *testing.T) { name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 24 * 3600 * seconds, + End: 100*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up[5d:10m]", }, @@ -589,39 +589,39 @@ func Test_dynamicIntervalFn(t *testing.T) { }, { baseSplitInterval: time.Hour, - name: "100 hour range with 100 max splits, expect split by 1 hour", + name: "120 hour range with 120 max shards, expect split by 1 hour", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 3600 * seconds, + End: 120*3600*seconds - 1, Step: 60 * seconds, Query: "up", }, - maxShardsPerQuery: 100, + maxShardsPerQuery: 120, expectedInterval: time.Hour, }, { baseSplitInterval: time.Hour, - name: "100 hour range with 20 max splits, expect split by 5 hours", + name: "120 hour range with 20 max shards, expect split by 6 hours", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 3600 * seconds, + End: 120*3600*seconds - 1, Step: 60 * seconds, Query: "up", }, maxShardsPerQuery: 20, - expectedInterval: 5 * time.Hour, + expectedInterval: 6 * time.Hour, }, { baseSplitInterval: time.Hour, - name: "100 hours with 200 hour max duration fetched and 5h matrix selector, expect split by 5 hours", + name: "120 hours with 200h max duration fetched and 4h matrix selector, expect split by 6 hours", req: &tripperware.PrometheusRequest{ - Start: 100 * 3600 * seconds, - End: 200 * 3600 * seconds, + Start: 0 * 3600 * seconds, + End: 120*3600*seconds - 1, Step: 60 * seconds, - Query: "up[5h]", + Query: "up[4h]", }, maxFetchedDataDuration: 200 * time.Hour, - expectedInterval: 5 * time.Hour, + expectedInterval: 6 * time.Hour, }, } { t.Run(tc.name, func(t *testing.T) { @@ -633,7 +633,7 @@ func Test_dynamicIntervalFn(t *testing.T) { }, } ctx := user.InjectOrgID(context.Background(), "1") - ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + _, interval, err := dynamicIntervalFn(cfg, mockLimits{}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) if !tc.expectedError { @@ -657,10 +657,10 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { }{ { baseSplitInterval: day, - name: "23 hour range with 30 max splits, expect split by 1 day and 3 vertical shards", + name: "23 hour range with 30 max shards, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ Start: 0, - End: 23 * 3600 * seconds, + End: 23*3600*seconds - 1, Step: 60 * seconds, Query: "sum(up) by (cluster)", }, @@ -671,10 +671,10 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { }, { baseSplitInterval: day, - name: "30 day range with 30 max splits, expect split by 1 days and 1 vertical shards", + name: "30 day range with 30 max shards, expect split by 1 days and 1 vertical shards", req: &tripperware.PrometheusRequest{ Start: 0, - End: 30 * 24 * 3600 * seconds, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(up) by (cluster)", }, @@ -685,10 +685,10 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { }, { baseSplitInterval: day, - name: "30 day range with 10 max splits, expect split by 3 days and 1 vertical shard", + name: "30 day range with 10 max shards, expect split by 3 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ Start: 0, - End: 30 * 24 * 3600 * seconds, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(up) by (cluster)", }, @@ -699,10 +699,10 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { }, { baseSplitInterval: day, - name: "31 day range with 10 max splits, expect split by 7 days and 2 vertical shards", + name: "31 day range with 10 max shards, expect split by 7 days and 2 vertical shards", req: &tripperware.PrometheusRequest{ Start: 0, - End: 31 * 24 * 3600 * seconds, + End: 31*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(up) by (cluster)", }, @@ -716,7 +716,7 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(rate(up[1h])) by (cluster)", }, @@ -730,7 +730,7 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(rate(up[20d])) by (cluster)", }, @@ -744,7 +744,7 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 24 * 3600 * seconds, + End: 100*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "sum(rate(up[5d:10m])) by (cluster)", }, @@ -778,45 +778,6 @@ func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { } } -func Test_getMaxSplitsFromConfig(t *testing.T) { - for i, tc := range []struct { - maxIntervalSplits int - verticalShardSize int - expectedMaxSplits int - }{ - { - verticalShardSize: 1, - maxIntervalSplits: 100, - expectedMaxSplits: 100, - }, - { - verticalShardSize: 1, - maxIntervalSplits: 15, - expectedMaxSplits: 15, - }, - { - verticalShardSize: 1, - maxIntervalSplits: 0, - expectedMaxSplits: 1, - }, - { - verticalShardSize: 3, - maxIntervalSplits: 15, - expectedMaxSplits: 5, - }, - { - verticalShardSize: 5, - maxIntervalSplits: 103, - expectedMaxSplits: 20, - }, - } { - t.Run(strconv.Itoa(i), func(t *testing.T) { - maxSplits := getMaxSplitsFromConfig(tc.maxIntervalSplits, tc.verticalShardSize) - require.Equal(t, tc.expectedMaxSplits, maxSplits) - }) - } -} - func Test_getIntervalFromMaxSplits(t *testing.T) { for _, tc := range []struct { name string