diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 138338daf8..fd5eeda585 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -18,7 +18,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error) +type IntervalFn func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) // SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval. func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, lookbackDelta time.Duration) tripperware.Middleware { @@ -52,7 +52,7 @@ type splitByInterval struct { func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) { // First we're going to build new requests, one for each day, taking care // to line up the boundaries with step. - interval, err := s.interval(ctx, r) + ctx, interval, err := s.interval(ctx, r) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } @@ -152,66 +152,83 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 { } // Returns a fixed split interval -func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { - return func(_ context.Context, _ tripperware.Request) (time.Duration, error) { - return cfg.SplitQueriesByInterval, nil +func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { + return func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, cfg.SplitQueriesByInterval, nil } } // Returns a dynamic multiple of base interval adjusted depending on configured 'max_shards_per_query' and 'max_fetched_data_duration_per_query' -func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) { - return func(ctx context.Context, r tripperware.Request) (time.Duration, error) { +func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { + return func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) { baseInterval := cfg.SplitQueriesByInterval dynamicSplitCfg := cfg.DynamicQuerySplitsConfig if dynamicSplitCfg.MaxShardsPerQuery == 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery == 0 { - return baseInterval, nil + return ctx, baseInterval, nil } - queryVerticalShardSize, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer) + maxVerticalShardSize, isShardable, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer) if err != nil { - return baseInterval, err + return ctx, baseInterval, err } queryExpr, err := parser.ParseExpr(r.GetQuery()) if err != nil { - return baseInterval, err + return ctx, baseInterval, err } - maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, queryVerticalShardSize) - maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, queryVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) - - // Use the more restrictive max splits limit - var maxSplits int - switch { - case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: - maxSplits = min(maxSplitsPerQuery, maxSplitsFromDurationFetched) - case dynamicSplitCfg.MaxShardsPerQuery > 0: - maxSplits = maxSplitsPerQuery - case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: - maxSplits = maxSplitsFromDurationFetched + interval := baseInterval + verticalShardSize := 1 + totalShards := 0 + // Find the combination of horizontal splits and vertical shards that will result in the largest total number of shards + for candidateVerticalShardSize := 1; candidateVerticalShardSize <= maxVerticalShardSize; candidateVerticalShardSize++ { + maxSplitsFromMaxShards := getMaxSplitsFromMaxQueryShards(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize) + maxSplitsFromDurationFetched := getMaxSplitsFromDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta) + + // Use the more restrictive max splits limit + var maxSplits int + switch { + case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: + maxSplits = min(maxSplitsFromMaxShards, maxSplitsFromDurationFetched) + case dynamicSplitCfg.MaxShardsPerQuery > 0: + maxSplits = maxSplitsFromMaxShards + case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0: + maxSplits = maxSplitsFromDurationFetched + } + + candidateInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) + if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); candidateTotalShards > totalShards { + interval = candidateInterval + verticalShardSize = candidateVerticalShardSize + totalShards = candidateTotalShards + } + } + + // Set number of vertical shards to be used in shard_by middleware + if isShardable && maxVerticalShardSize > 1 { + ctx = tripperware.InjectVerticalShardSizeToContext(ctx, verticalShardSize) } - interval := getIntervalFromMaxSplits(r, baseInterval, maxSplits) - return interval, nil + return ctx, interval, nil } } -func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, error) { +func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, bool, error) { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { - return 1, err + return 1, false, err } analysis, err := queryAnalyzer.Analyze(r.GetQuery()) if err != nil { - return 1, err + return 1, false, err } queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) if queryVerticalShardSize <= 0 || !analysis.IsShardable() { queryVerticalShardSize = 1 } - return queryVerticalShardSize, nil + return queryVerticalShardSize, analysis.IsShardable(), nil } // Returns the minimum multiple of base interval needed to split query into less than maxSplits @@ -219,7 +236,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplits := time.Duration(maxSplitsInt) queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond)) // Calculate the multiple (n) of base interval needed to shard query into <= maxSplits - n := ceilDiv(queryRange, baseInterval*maxSplits) + n := ceilDiv(int64(queryRange), int64(baseInterval*maxSplits)) if n <= 0 { n = 1 } @@ -227,9 +244,9 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, // Loop to handle cases where first split is truncated and shorter than remaining splits. // Exits loop if interval (n) is sufficient after removing first split // If no suitable interval was found terminates at a maximum of interval = 2 * query range - for n <= 2*ceilDiv(queryRange, baseInterval) { - // Find new start time for query after removing first split - nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep() + for n <= 2*ceilDiv(int64(queryRange), int64(baseInterval)) { + // Find the new start time for query after removing first split + nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), time.Duration(n)*baseInterval) + r.GetStep() if maxSplits == 1 { // If maxSplits == 1, the removed first split should cover the full query range. if nextSplitStart >= r.GetEnd() { @@ -238,7 +255,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, } else { queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond)) // Recalculate n for the remaining query range with maxSplits-1. - n_temp := ceilDiv(queryRangeWithoutFirstSplit, baseInterval*(maxSplits-1)) + n_temp := ceilDiv(int64(queryRangeWithoutFirstSplit), int64(baseInterval*(maxSplits-1))) // If a larger interval is needed after removing the first split, the initial n was insufficient. if n >= n_temp { break @@ -247,11 +264,11 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, // Increment n to check if larger interval fits the maxSplits constraint. n++ } - return n * baseInterval + return time.Duration(n) * baseInterval } // Return max allowed number of splits by MaxShardsPerQuery config after accounting for vertical sharding -func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int) int { +func getMaxSplitsFromMaxQueryShards(maxSplitsConfigValue int, queryVerticalShardSize int) int { var maxSplitsFromConfig int if maxSplitsConfigValue > 0 { maxSplitsFromConfig = maxSplitsConfigValue / queryVerticalShardSize @@ -263,17 +280,18 @@ func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int } // Return max allowed number of splits by MaxFetchedDataDurationPerQuery config after accounting for vertical sharding -func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int { - fixedDurationFetched, perSplitDurationFetched := getDurationFetchedByQuerySplitting(expr, queryStart, queryEnd, queryStep, baseInterval, lookbackDelta) - if perSplitDurationFetched == 0 { - return int(maxFetchedDataDurationPerQuery / baseInterval) // Total duration fetched does not increase with number of splits, return default max splits +func getMaxSplitsFromDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int { + durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) + + if durationFetchedBySelectors == 0 { + return int(maxFetchedDataDurationPerQuery / baseInterval) // The total duration fetched does not increase with number of splits, return default max splits } var maxSplitsByDurationFetched int if maxFetchedDataDurationPerQuery > 0 { - // Duration fetched by query after splitting = fixedDurationFetched + perSplitDurationFetched x numOfShards + // Duration fetched by query after splitting = durationFetchedByRange + durationFetchedBySelectors x numOfShards // Rearranging the equation to find the max horizontal splits after accounting for vertical shards - maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - fixedDurationFetched) / perSplitDurationFetched) + maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - durationFetchedByRange) / durationFetchedBySelectors) } if maxSplitsByDurationFetched <= 0 { maxSplitsByDurationFetched = 1 @@ -281,57 +299,18 @@ func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, return maxSplitsByDurationFetched } -// Return the fixed base duration fetched by the query regardless of the number of splits, and the duration that is fetched once for every split -func getDurationFetchedByQuerySplitting(expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) (fixedDurationFetched time.Duration, perSplitDurationFetched time.Duration) { - // First analyze the query using original start-end time. Duration fetched by lookbackDelta here only reflects the start time of first split - durationFetchedByRange, durationFetchedBySelectors, durationFetchedByLookbackDeltaFirstSplit := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) - - fixedDurationFetched += durationFetchedByRange // Duration fetched by the query range is constant regardless of how many splits the query has - perSplitDurationFetched += durationFetchedBySelectors // Duration fetched by selectors is fetched once for every query split - - // Next analyze the query using the next split start time to find the duration fetched by lookbackDelta for splits other than first one - nextIntervalStart := nextIntervalBoundary(queryStart, queryStep, baseInterval) + queryStep - _, _, durationFetchedByLookbackDeltaOtherSplits := analyzeDurationFetchedByQueryExpr(expr, nextIntervalStart, queryEnd, baseInterval, lookbackDelta) - - // Handle different cases for lookbackDelta - if durationFetchedByLookbackDeltaFirstSplit > 0 && durationFetchedByLookbackDeltaOtherSplits > 0 { - // lookbackDelta is fetching additional duration for all splits - perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits - } else if durationFetchedByLookbackDeltaOtherSplits > 0 { - // lookbackDelta is fetching additional duration for all splits except first one - perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits - fixedDurationFetched -= durationFetchedByLookbackDeltaOtherSplits - } else if durationFetchedByLookbackDeltaFirstSplit > 0 { - // lookbackDelta is fetching additional duration for first split only - fixedDurationFetched += durationFetchedByLookbackDeltaFirstSplit - } - - return fixedDurationFetched, perSplitDurationFetched -} - // analyzeDurationFetchedByQueryExpr analyzes the query to calculate -// the duration of data that will be fetched from storage by different -// parts of the query -// -// Returns: -// - durationFetchedByRange: The total duration fetched by the original start-end -// range of the query. -// - durationFetchedBySelectors: The duration fetched by matrix selectors -// and/or subqueries. This duration will be fetched once by every query split. -// - durationFetchedByLookbackDelta: The duration fetched by lookbackDelta -// for the specified query start time. +// the estimated duration of data that will be fetched from storage // // Example: // Query up[15d:1h] with a range of 30 days, 1 day base split interval, and 5 min lookbackDelta with 00:00 UTC start time // - durationFetchedByRange = 30 day -// - durationFetchedBySelectors = 15 day -// - durationFetchedByLookbackDelta = 1 day -func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration, durationFetchedByLookbackDelta time.Duration) { - durationFetchedByRangeCount := 0 - durationFetchedByLookbackDeltaCount := 0 +// - durationFetchedBySelectors = 16 day +func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration) { baseIntervalMillis := util.DurationMilliseconds(baseInterval) + durationFetchedByRangeCount := 0 + durationFetchedBySelectorsCount := 0 - totalDurationFetchedCount := 0 var evalRange time.Duration parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { switch n := node.(type) { @@ -341,18 +320,11 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query queryEndIntervalIndex := floorDiv(queryEnd, baseIntervalMillis) durationFetchedByRangeCount += int(queryEndIntervalIndex-queryStartIntervalIndex) + 1 - // Adjust start and end time based on matrix selectors and/or subquery selector and increment total duration fetched, this excludes lookbackDelta - start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, 0, n, path, evalRange) - startIntervalIndex := floorDiv(start, baseIntervalMillis) - endIntervalIndex := floorDiv(end, baseIntervalMillis) - totalDurationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1 - - // Increment duration fetched by lookbackDelta - startLookbackDelta := start - util.DurationMilliseconds(lookbackDelta) - startLookbackDeltaIntervalIndex := floorDiv(startLookbackDelta, baseIntervalMillis) - if evalRange == 0 && startLookbackDeltaIntervalIndex < startIntervalIndex { - durationFetchedByLookbackDeltaCount += int(startIntervalIndex - startLookbackDeltaIntervalIndex) - } + // Adjust start time based on matrix selectors and/or subquery selectors and calculate additional lookback duration fetched + start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, lookbackDelta, n, path, evalRange) + durationFetchedBySelectors := (end - start) - (queryEnd - queryStart) + durationFetchedBySelectorsCount += int(ceilDiv(durationFetchedBySelectors, baseIntervalMillis)) + evalRange = 0 case *parser.MatrixSelector: evalRange = n.Range @@ -360,8 +332,13 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query return nil }) - durationFetchedBySelectorsCount := totalDurationFetchedCount - durationFetchedByRangeCount - return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval, time.Duration(durationFetchedByLookbackDeltaCount) * baseInterval + return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval +} + +func getExpectedTotalShards(queryStart int64, queryEnd int64, interval time.Duration, verticalShardSize int) int { + queryRange := time.Duration((queryEnd - queryStart) * int64(time.Millisecond)) + expectedSplits := int(ceilDiv(int64(queryRange), int64(interval))) + return expectedSplits * verticalShardSize } func floorDiv(a, b int64) int64 { @@ -371,7 +348,7 @@ func floorDiv(a, b int64) int64 { return a / b } -func ceilDiv(a, b time.Duration) time.Duration { +func ceilDiv(a, b int64) int64 { if a > 0 && a%b != 0 { return a/b + 1 } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 3e8d59f786..53f96dacb9 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -297,7 +297,9 @@ func TestSplitByDay(t *testing.T) { path: query, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 2, - intervalFn: func(_ context.Context, _ tripperware.Request) (time.Duration, error) { return 24 * time.Hour, nil }, + intervalFn: func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, 24 * time.Hour, nil + }, }, { path: query, @@ -309,7 +311,9 @@ func TestSplitByDay(t *testing.T) { path: longQuery, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 31, - intervalFn: func(_ context.Context, _ tripperware.Request) (time.Duration, error) { return day, nil }, + intervalFn: func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { + return ctx, day, nil + }, }, { path: longQuery, @@ -440,14 +444,13 @@ func Test_evaluateAtModifier(t *testing.T) { func Test_dynamicIntervalFn(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - req tripperware.Request - expectedInterval time.Duration - expectedError bool - verticalShardSize int - maxIntervalSplits int - maxDurationOfDataFetched time.Duration + name string + baseSplitInterval time.Duration + req tripperware.Request + maxShardsPerQuery int + maxFetchedDataDuration time.Duration + expectedInterval time.Duration + expectedError bool }{ { baseSplitInterval: day, @@ -455,209 +458,184 @@ func Test_dynamicIntervalFn(t *testing.T) { req: &tripperware.PrometheusRequest{ Query: "up[aaa", Start: 0, - End: 10 * 24 * 3600 * seconds, + End: 10*24*3600*seconds - 1, Step: 5 * 60 * seconds, }, - maxIntervalSplits: 30, - maxDurationOfDataFetched: 200 * day, - expectedInterval: day, - expectedError: true, + maxShardsPerQuery: 30, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, + expectedError: true, }, { baseSplitInterval: day, - name: "23 hour range with 10 max splits, expect split by 1 day", + name: "23 hour range with 30 max shards, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 0, - End: 23 * 3600 * seconds, + End: 23*3600*seconds - 1, Step: 60 * seconds, Query: "up", }, - maxIntervalSplits: 10, + maxShardsPerQuery: 30, expectedInterval: day, }, - { - baseSplitInterval: time.Hour, - name: "24 hour range with 100 max splits, expect split by 1 hour", - req: &tripperware.PrometheusRequest{ - Start: 0, - End: 24 * 3600 * seconds, - Step: 60 * seconds, - Query: "up", - }, - maxIntervalSplits: 100, - expectedInterval: time.Hour, - }, - { - baseSplitInterval: time.Hour, - name: "120 hour range with 15 max splits, expect split by 8 hours", - req: &tripperware.PrometheusRequest{ - Start: 0, - End: 5 * 24 * 3600 * seconds, - Step: 60 * seconds, - Query: "up", - }, - maxIntervalSplits: 15, - expectedInterval: 8 * time.Hour, - }, { baseSplitInterval: day, - name: "30 day range with 30 max splits, expect split by 1 day", + name: "30 day range with 30 max shards, expect split by 1 day", req: &tripperware.PrometheusRequest{ Start: 0, - End: 30 * 24 * 3600 * seconds, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 30, + maxShardsPerQuery: 30, expectedInterval: day, }, { baseSplitInterval: day, - name: "30 day range with 20 max splits, expect split by 2 day", + name: "30 day range with 10 max shards, expect split by 3 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 20, - expectedInterval: 2 * day, + maxShardsPerQuery: 10, + expectedInterval: 3 * day, }, { baseSplitInterval: day, - name: "60 day range with 15 max splits, expect split by 4 day", + name: "31 day range with 10 max shards, expect split by 4 days", req: &tripperware.PrometheusRequest{ Start: 0, - End: 60 * 24 * 3600 * seconds, + End: 31*24*3600*seconds - 1, Step: 5 * 60 * seconds, Query: "up", }, - maxIntervalSplits: 15, + maxShardsPerQuery: 10, expectedInterval: 4 * day, }, { baseSplitInterval: day, - name: "61 day range with 15 max splits, expect split by 5 day", + name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day", req: &tripperware.PrometheusRequest{ - Start: 0, - End: 61 * 24 * 3600 * seconds, + Start: 30 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "up", + Query: "avg_over_time(up[1h])", }, - maxIntervalSplits: 15, - expectedInterval: 5 * day, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, }, { baseSplitInterval: day, - name: "60 day range with 15 max splits and 3 vertical shard size, expect split by 12 days", + name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days", req: &tripperware.PrometheusRequest{ - Start: 0, - End: 60 * 24 * 3600 * seconds, + Start: 30 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "sum by (pod) (up)", + Query: "avg_over_time(up[20d])", }, - verticalShardSize: 3, - maxIntervalSplits: 15, - expectedInterval: 12 * day, + maxFetchedDataDuration: 200 * day, + expectedInterval: 4 * day, }, { - baseSplitInterval: time.Hour, - name: "101 hours with 200 hour max duration fetched and 5m matrix selector, expect split by 2 hour", + baseSplitInterval: day, + name: "roughly 30 day range with multiple matrix selectors and 200 days max duration fetched, expect split by 3 days", req: &tripperware.PrometheusRequest{ - Start: (3 * 24 * 3600 * seconds) - (2 * 3600 * seconds), - End: (7 * 24 * 3600 * seconds) + (3 * 3600 * seconds) - 1, - Step: 60 * seconds, - Query: "up[5m]", + Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), + End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Step: 5 * 60 * seconds, + Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", }, - maxDurationOfDataFetched: 200 * time.Hour, - expectedInterval: 2 * time.Hour, + maxFetchedDataDuration: 200 * day, + expectedInterval: 3 * day, }, { baseSplitInterval: day, - name: "30 day range with 200 days max duration fetched and 1h matrix selector, expect split by 1 day", + name: "50 day range with 5 day subquery and 100 days max duration fetched, expect split by 7 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 50*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "avg_over_time(up[1h])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: day, + maxFetchedDataDuration: 100 * day, + expectedInterval: 7 * day, }, { baseSplitInterval: day, - name: "30 day range with 200 days max duration fetched and 20d matrix selector, expect split by 4 days", + name: "50 day range with 5 day subquery and 100 days max duration fetched and 5 max shards, expect split by 10 days", req: &tripperware.PrometheusRequest{ - Start: 30 * 24 * 3600 * seconds, - End: 60 * 24 * 3600 * seconds, + Start: 0, + End: 50*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "avg_over_time(up[20d])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: 4 * day, + maxShardsPerQuery: 5, + maxFetchedDataDuration: 100 * day, + expectedInterval: 10 * day, }, { baseSplitInterval: day, - name: "30 day range with multiple matrix selectors, expect split by 3 days", + name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Start: 0, + End: 100*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Query: "up[5d:10m]", }, - maxDurationOfDataFetched: 200 * day, - expectedInterval: 3 * day, + maxFetchedDataDuration: 100 * day, + expectedInterval: 100 * day, }, { - baseSplitInterval: day, - name: "30 day range with multiple matrix selectors and 3 vertical shards, expect split by 6 days", + baseSplitInterval: time.Hour, + name: "120 hour range with 120 max shards, expect split by 1 hour", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), - Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Start: 0, + End: 120*3600*seconds - 1, + Step: 60 * seconds, + Query: "up", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 350 * day, - expectedInterval: 6 * day, + maxShardsPerQuery: 120, + expectedInterval: time.Hour, }, { - baseSplitInterval: day, - name: "100 day range with 5 day subquery, expect split by 13 days", + baseSplitInterval: time.Hour, + name: "120 hour range with 20 max shards, expect split by 6 hours", req: &tripperware.PrometheusRequest{ Start: 0, - End: 100 * 24 * 3600 * seconds, - Step: 60 * 60 * seconds, - Query: "up[5d:10m]", + End: 120*3600*seconds - 1, + Step: 60 * seconds, + Query: "up", }, - maxIntervalSplits: 100, - maxDurationOfDataFetched: 150 * day, - expectedInterval: 13 * day, + maxShardsPerQuery: 20, + expectedInterval: 6 * time.Hour, }, { - baseSplitInterval: day, - name: "51 day range, longer than max duration fetched, expect split by 51 day", + baseSplitInterval: time.Hour, + name: "120 hours with 200h max duration fetched and 4h matrix selector, expect split by 6 hours", req: &tripperware.PrometheusRequest{ - Start: 0, - End: (51 * 24 * 3600 * seconds), - Step: 5 * 60 * seconds, - Query: "up[5d]", + Start: 0 * 3600 * seconds, + End: 120*3600*seconds - 1, + Step: 60 * seconds, + Query: "up[4h]", }, - maxDurationOfDataFetched: 50 * day, - expectedInterval: 51 * day, + maxFetchedDataDuration: 200 * time.Hour, + expectedInterval: 6 * time.Hour, }, } { t.Run(tc.name, func(t *testing.T) { cfg := Config{ SplitQueriesByInterval: tc.baseSplitInterval, DynamicQuerySplitsConfig: DynamicQuerySplitsConfig{ - MaxShardsPerQuery: tc.maxIntervalSplits, - MaxFetchedDataDurationPerQuery: tc.maxDurationOfDataFetched, + MaxShardsPerQuery: tc.maxShardsPerQuery, + MaxFetchedDataDurationPerQuery: tc.maxFetchedDataDuration, }, } ctx := user.InjectOrgID(context.Background(), "1") - interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.verticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + _, interval, err := dynamicIntervalFn(cfg, mockLimits{}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) require.Equal(t, tc.expectedInterval, interval) + if !tc.expectedError { require.Nil(t, err) } @@ -665,199 +643,137 @@ func Test_dynamicIntervalFn(t *testing.T) { } } -func Test_getMaxSplitsByDurationFetched(t *testing.T) { +func Test_dynamicIntervalFn_verticalSharding(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - req tripperware.Request - verticalShardSize int - lookbackDelta time.Duration - maxDurationOfDataFetched time.Duration - expectedMaxSplits int + name string + baseSplitInterval time.Duration + req tripperware.Request + maxVerticalShardSize int + maxShardsPerQuery int + maxFetchedDataDuration time.Duration + expectedInterval time.Duration + expectedVerticalShardSize int + expectedError bool }{ { baseSplitInterval: day, - name: "20 day range with no lookback, expect default max splits", + name: "23 hour range with 30 max shards, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, - Step: 5 * 60 * seconds, + End: 23*3600*seconds - 1, + Step: 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 200, + maxShardsPerQuery: 30, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, { baseSplitInterval: day, - name: "20 days range with 5 min lookback, expect 180 max splits", + name: "30 day range with 30 max shards, expect split by 1 days and 1 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 180, + maxShardsPerQuery: 30, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - name: "20 days range with 5 min lookback but first split starts at 30 min, expect 181 max splits", + name: "30 day range with 10 max shards, expect split by 3 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ - Query: "up", - Start: (30 * 60 * seconds), - End: (20 * 24 * 3600 * seconds) - 1, - Step: 5 * 60 * seconds, - }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 181, - }, - { - baseSplitInterval: day, - name: "20 days range with 1 day subquery and 5 min lookback, expect 90 max splits", - req: &tripperware.PrometheusRequest{ - Query: "up[1d:1h]", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, + End: 30*24*3600*seconds - 1, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 90, + maxShardsPerQuery: 10, + expectedInterval: 3 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - // Certain step sizes can cause the start time of every split except the first one to shift forward. In these cases, - // lookbackDelta might not fetch additional days for all splits (except the first one if it starts at 00:00 UTC) - name: "20 days range with 1 day subquery and 5 min lookback but splits are step aligned forward, expect 179 max splits", + name: "31 day range with 10 max shards, expect split by 7 days and 2 vertical shards", req: &tripperware.PrometheusRequest{ - Query: "up[1d:1h]", Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, - Step: 22 * 60 * seconds, - }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 179, - }, - { - baseSplitInterval: day, - name: "20 days range with 3 day matrix selector, expect 60 max splits", - req: &tripperware.PrometheusRequest{ - Query: "up[3d]", - Start: 0, - End: (20 * 24 * 3600 * seconds) - 1, + End: 31*24*3600*seconds - 1, Step: 5 * 60 * seconds, + Query: "sum(up) by (cluster)", }, - verticalShardSize: 1, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 60, + maxShardsPerQuery: 10, + expectedInterval: 7 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 2, }, { baseSplitInterval: day, - name: "30 days range with 20 day subquery, expect 8 max splits", + name: "30 day range with 1h matrix selector and 200 days max duration fetched, expect split by 1 day and 3 vertical shards", req: &tripperware.PrometheusRequest{ Start: 30 * 24 * 3600 * seconds, - End: (60 * 24 * 3600 * seconds) - 1, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "avg_over_time(up[20d:1h])", + Query: "sum(rate(up[1h])) by (cluster)", }, - verticalShardSize: 1, - lookbackDelta: 5 * time.Minute, - maxDurationOfDataFetched: 200 * day, - expectedMaxSplits: 8, + maxFetchedDataDuration: 200 * day, + expectedInterval: day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, { baseSplitInterval: day, - // Each term in the query fetches 20 days of range - name: "20 days range with multiple matrix selectors, expect 20 max splits", + name: "30 day range with 20d matrix selector and 200 days max duration fetched, expect split by 4 days and 1 vertical shard", req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), - Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", - }, - verticalShardSize: 1, - maxDurationOfDataFetched: 350 * day, - expectedMaxSplits: 20, - }, - { - baseSplitInterval: day, - name: "20 days range with multiple matrix selectors and 3 vertical shard size, expect 4 max splits", - req: &tripperware.PrometheusRequest{ - Start: (14 * 24 * 3600 * seconds) - (3600 * seconds), - End: (32 * 24 * 3600 * seconds) + (2 * 3600 * seconds), + Start: 30 * 24 * 3600 * seconds, + End: 60*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "rate(up[2d]) + rate(up[5d]) + rate(up[7d])", + Query: "sum(rate(up[20d])) by (cluster)", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 350 * day, - expectedMaxSplits: 4, + maxFetchedDataDuration: 200 * day, + expectedInterval: 4 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 1, }, { baseSplitInterval: day, - name: "around 37 days range with 5 day matrix selector and 3 vertical shards, expect 25 max splits", + name: "100 day range with 5 day subquery and 100 days max duration fetched, expect no splitting (100 day interval)", req: &tripperware.PrometheusRequest{ - Start: (3 * 24 * 3600 * seconds) - (4*3600*seconds + 240*seconds), - End: (38 * 24 * 3600 * seconds) + (1*3600*seconds + 60*seconds), + Start: 0, + End: 100*24*3600*seconds - 1, Step: 5 * 60 * seconds, - Query: "up[5d]", + Query: "sum(rate(up[5d:10m])) by (cluster)", }, - verticalShardSize: 3, - maxDurationOfDataFetched: 500 * day, - expectedMaxSplits: 25, + maxFetchedDataDuration: 100 * day, + expectedInterval: 100 * day, + maxVerticalShardSize: 3, + expectedVerticalShardSize: 3, }, } { t.Run(tc.name, func(t *testing.T) { - queryExpr, err := parser.ParseExpr(tc.req.GetQuery()) - require.Nil(t, err) - maxSplits := getMaxSplitsByDurationFetched(tc.maxDurationOfDataFetched, tc.verticalShardSize, queryExpr, tc.req.GetStart(), tc.req.GetEnd(), tc.req.GetStep(), tc.baseSplitInterval, tc.lookbackDelta) - require.Equal(t, tc.expectedMaxSplits, maxSplits) - }) - } -} + cfg := Config{ + SplitQueriesByInterval: tc.baseSplitInterval, + DynamicQuerySplitsConfig: DynamicQuerySplitsConfig{ + MaxShardsPerQuery: tc.maxShardsPerQuery, + MaxFetchedDataDurationPerQuery: tc.maxFetchedDataDuration, + }, + } + ctx := user.InjectOrgID(context.Background(), "1") + ctx, interval, err := dynamicIntervalFn(cfg, mockLimits{queryVerticalShardSize: tc.maxVerticalShardSize}, querysharding.NewQueryAnalyzer(), lookbackDelta)(ctx, tc.req) + require.Equal(t, tc.expectedInterval, interval) -func Test_getMaxSplitsFromConfig(t *testing.T) { - for i, tc := range []struct { - maxIntervalSplits int - verticalShardSize int - expectedMaxSplits int - }{ - { - verticalShardSize: 1, - maxIntervalSplits: 100, - expectedMaxSplits: 100, - }, - { - verticalShardSize: 1, - maxIntervalSplits: 15, - expectedMaxSplits: 15, - }, - { - verticalShardSize: 1, - maxIntervalSplits: 0, - expectedMaxSplits: 1, - }, - { - verticalShardSize: 3, - maxIntervalSplits: 15, - expectedMaxSplits: 5, - }, - { - verticalShardSize: 5, - maxIntervalSplits: 103, - expectedMaxSplits: 20, - }, - } { - t.Run(strconv.Itoa(i), func(t *testing.T) { - maxSplits := getMaxSplitsFromConfig(tc.maxIntervalSplits, tc.verticalShardSize) - require.Equal(t, tc.expectedMaxSplits, maxSplits) + if tc.expectedVerticalShardSize > 0 { + if verticalShardSize, ok := tripperware.VerticalShardSizeFromContext(ctx); ok { + require.Equal(t, tc.expectedVerticalShardSize, verticalShardSize) + } + } + if !tc.expectedError { + require.Nil(t, err) + } }) } } @@ -988,13 +904,12 @@ func Test_getIntervalFromMaxSplits(t *testing.T) { func Test_analyzeDurationFetchedByQuery(t *testing.T) { for _, tc := range []struct { - name string - baseSplitInterval time.Duration - lookbackDelta time.Duration - req tripperware.Request - expectedDurationFetchedByRange time.Duration - expectedDurationFetchedBySelectors time.Duration - expectedDurationFetchedByLookbackDelta time.Duration + name string + baseSplitInterval time.Duration + lookbackDelta time.Duration + req tripperware.Request + expectedDurationFetchedByRange time.Duration + expectedDurationFetchedBySelectors time.Duration }{ { name: "query range 00:00 to 23:59", @@ -1006,9 +921,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 0 * time.Hour, }, { name: "query range 00:00 to 00:00 next day", @@ -1020,9 +934,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 25 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 25 * time.Hour, + expectedDurationFetchedBySelectors: 0 * time.Hour, }, { name: "query range 00:00 to 23:59, with 5 min lookback", @@ -1034,9 +947,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 0 * time.Hour, - expectedDurationFetchedByLookbackDelta: 1 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 1 * time.Hour, }, { name: "query range 00:00 to 23:59, with 5 hour subquery", @@ -1048,9 +960,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "up[5h:10m]", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 5 * time.Hour, - expectedDurationFetchedByLookbackDelta: 1 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 6 * time.Hour, }, { name: "query range 00:00 to 23:59, with 2 hour matrix selector", @@ -1062,9 +973,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "rate(up[2h])", }, - expectedDurationFetchedByRange: 24 * time.Hour, - expectedDurationFetchedBySelectors: 2 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 24 * time.Hour, + expectedDurationFetchedBySelectors: 2 * time.Hour, }, { name: "query range 00:00 to 23:59, with multiple matrix selectors", @@ -1076,9 +986,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 60 * seconds, Query: "rate(up[2h]) + rate(up[5h]) + rate(up[7h])", }, - expectedDurationFetchedByRange: 72 * time.Hour, - expectedDurationFetchedBySelectors: 14 * time.Hour, - expectedDurationFetchedByLookbackDelta: 0 * time.Hour, + expectedDurationFetchedByRange: 72 * time.Hour, + expectedDurationFetchedBySelectors: 14 * time.Hour, }, { name: "query range 60 day with 20 day subquery", @@ -1090,9 +999,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[20d:1h]", }, - expectedDurationFetchedByRange: 61 * day, - expectedDurationFetchedBySelectors: 20 * day, - expectedDurationFetchedByLookbackDelta: 1 * day, + expectedDurationFetchedByRange: 61 * day, + expectedDurationFetchedBySelectors: 21 * day, }, { name: "query range 35 day, with 15 day subquery and 1 week offset", @@ -1104,9 +1012,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[15d:1h] offset 1w", }, - expectedDurationFetchedByRange: 36 * day, - expectedDurationFetchedBySelectors: 15 * day, - expectedDurationFetchedByLookbackDelta: 1 * day, + expectedDurationFetchedByRange: 36 * day, + expectedDurationFetchedBySelectors: 16 * day, }, { name: "query range 10 days, with multiple subqueries and offsets", @@ -1118,9 +1025,8 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "rate(up[2d:1h] offset 1w) + rate(up[5d:1h] offset 2w) + rate(up[7d:1h] offset 3w)", }, - expectedDurationFetchedByRange: 33 * day, - expectedDurationFetchedBySelectors: 14 * day, - expectedDurationFetchedByLookbackDelta: 3 * day, + expectedDurationFetchedByRange: 33 * day, + expectedDurationFetchedBySelectors: 17 * day, }, { name: "query range spans 40 days with 4 day matrix selector", @@ -1132,18 +1038,16 @@ func Test_analyzeDurationFetchedByQuery(t *testing.T) { Step: 5 * 60 * seconds, Query: "up[4d]", }, - expectedDurationFetchedByRange: 40 * day, - expectedDurationFetchedBySelectors: 4 * day, - expectedDurationFetchedByLookbackDelta: 0 * day, + expectedDurationFetchedByRange: 40 * day, + expectedDurationFetchedBySelectors: 4 * day, }, } { t.Run(tc.name, func(t *testing.T) { expr, err := parser.ParseExpr(tc.req.GetQuery()) require.Nil(t, err) - queryRangeIntervals, extraIntervalsPerSplit, lookbackDeltaIntervals := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) - require.Equal(t, tc.expectedDurationFetchedByRange, queryRangeIntervals) - require.Equal(t, tc.expectedDurationFetchedBySelectors, extraIntervalsPerSplit) - require.Equal(t, tc.expectedDurationFetchedByLookbackDelta, lookbackDeltaIntervals) + durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, tc.req.GetStart(), tc.req.GetEnd(), tc.baseSplitInterval, tc.lookbackDelta) + require.Equal(t, tc.expectedDurationFetchedByRange, durationFetchedByRange) + require.Equal(t, tc.expectedDurationFetchedBySelectors, durationFetchedBySelectors) }) } } diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index fe6293832f..607e397f0e 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -45,9 +45,13 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } - numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, s.limits.QueryVerticalShardSize) + verticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, s.limits.QueryVerticalShardSize) + // Check if vertical shard size is set by dynamic query splitting + if dynamicVerticalShardSize, ok := VerticalShardSizeFromContext(ctx); ok { + verticalShardSize = dynamicVerticalShardSize + } - if numShards <= 1 { + if verticalShardSize <= 1 { return s.next.Do(ctx, r) } @@ -60,7 +64,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { stats.AddExtraFields( "shard_by.is_shardable", analysis.IsShardable(), - "shard_by.num_shards", numShards, + "shard_by.num_shards", verticalShardSize, "shard_by.sharding_labels", analysis.ShardingLabels(), ) @@ -68,7 +72,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return s.next.Do(ctx, r) } - reqs := s.shardQuery(logger, numShards, r, analysis) + reqs := s.shardQuery(logger, verticalShardSize, r, analysis) reqResps, err := DoRequests(ctx, s.next, reqs, s.limits) if err != nil { @@ -83,11 +87,11 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { return s.merger.MergeResponse(ctx, r, resps...) } -func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request { - reqs := make([]Request, numShards) - for i := 0; i < numShards; i++ { +func (s shardBy) shardQuery(l log.Logger, verticalShardSize int, r Request, analysis querysharding.QueryAnalysis) []Request { + reqs := make([]Request, verticalShardSize) + for i := 0; i < verticalShardSize; i++ { q, err := cquerysharding.InjectShardingInfo(r.GetQuery(), &storepb.ShardInfo{ - TotalShards: int64(numShards), + TotalShards: int64(verticalShardSize), ShardIndex: int64(i), By: analysis.ShardBy(), Labels: analysis.ShardingLabels(), @@ -102,3 +106,18 @@ func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis que return reqs } + +type verticalShardsKey struct{} + +func VerticalShardSizeFromContext(ctx context.Context) (int, bool) { + val := ctx.Value(verticalShardsKey{}) + if val == nil { + return 1, false + } + verticalShardSize, ok := val.(int) + return verticalShardSize, ok +} + +func InjectVerticalShardSizeToContext(ctx context.Context, verticalShardSize int) context.Context { + return context.WithValue(ctx, verticalShardsKey{}, verticalShardSize) +}