Skip to content

[Query-Frontend] Add dynamic query vertical sharding #6678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
179 changes: 78 additions & 101 deletions pkg/querier/tripperware/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

type IntervalFn func(ctx context.Context, r tripperware.Request) (time.Duration, error)
type IntervalFn func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error)

// SplitByIntervalMiddleware creates a new Middleware that splits requests by a given interval.
func SplitByIntervalMiddleware(interval IntervalFn, limits tripperware.Limits, merger tripperware.Merger, registerer prometheus.Registerer, lookbackDelta time.Duration) tripperware.Middleware {
Expand Down Expand Up @@ -52,7 +52,7 @@ type splitByInterval struct {
func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
// First we're going to build new requests, one for each day, taking care
// to line up the boundaries with step.
interval, err := s.interval(ctx, r)
ctx, interval, err := s.interval(ctx, r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
Expand Down Expand Up @@ -152,84 +152,101 @@ func nextIntervalBoundary(t, step int64, interval time.Duration) int64 {
}

// Returns a fixed split interval
func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(_ context.Context, _ tripperware.Request) (time.Duration, error) {
return cfg.SplitQueriesByInterval, nil
func staticIntervalFn(cfg Config) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) {
return func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) {
return ctx, cfg.SplitQueriesByInterval, nil
}
}

// Returns a dynamic multiple of base interval adjusted depending on configured 'max_shards_per_query' and 'max_fetched_data_duration_per_query'
func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
return func(ctx context.Context, r tripperware.Request) (time.Duration, error) {
func dynamicIntervalFn(cfg Config, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration) func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) {
return func(ctx context.Context, r tripperware.Request) (context.Context, time.Duration, error) {
baseInterval := cfg.SplitQueriesByInterval
dynamicSplitCfg := cfg.DynamicQuerySplitsConfig
if dynamicSplitCfg.MaxShardsPerQuery == 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery == 0 {
return baseInterval, nil
return ctx, baseInterval, nil
}

queryVerticalShardSize, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer)
maxVerticalShardSize, isShardable, err := getMaxVerticalShardSize(ctx, r, limits, queryAnalyzer)
if err != nil {
return baseInterval, err
return ctx, baseInterval, err
}

queryExpr, err := parser.ParseExpr(r.GetQuery())
if err != nil {
return baseInterval, err
return ctx, baseInterval, err
}

maxSplitsPerQuery := getMaxSplitsFromConfig(dynamicSplitCfg.MaxShardsPerQuery, queryVerticalShardSize)
maxSplitsFromDurationFetched := getMaxSplitsByDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, queryVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta)

// Use the more restrictive max splits limit
var maxSplits int
switch {
case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0:
maxSplits = min(maxSplitsPerQuery, maxSplitsFromDurationFetched)
case dynamicSplitCfg.MaxShardsPerQuery > 0:
maxSplits = maxSplitsPerQuery
case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0:
maxSplits = maxSplitsFromDurationFetched
interval := baseInterval
verticalShardSize := 1
totalShards := 0
// Find the combination of horizontal splits and vertical shards that will result in the largest total number of shards
for candidateVerticalShardSize := 1; candidateVerticalShardSize <= maxVerticalShardSize; candidateVerticalShardSize++ {
maxSplitsFromMaxShards := getMaxSplitsFromMaxQueryShards(dynamicSplitCfg.MaxShardsPerQuery, candidateVerticalShardSize)
maxSplitsFromDurationFetched := getMaxSplitsFromDurationFetched(dynamicSplitCfg.MaxFetchedDataDurationPerQuery, candidateVerticalShardSize, queryExpr, r.GetStart(), r.GetEnd(), r.GetStep(), baseInterval, lookbackDelta)

// Use the more restrictive max splits limit
var maxSplits int
switch {
case dynamicSplitCfg.MaxShardsPerQuery > 0 && dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0:
maxSplits = min(maxSplitsFromMaxShards, maxSplitsFromDurationFetched)
case dynamicSplitCfg.MaxShardsPerQuery > 0:
maxSplits = maxSplitsFromMaxShards
case dynamicSplitCfg.MaxFetchedDataDurationPerQuery > 0:
maxSplits = maxSplitsFromDurationFetched
}

candidateInterval := getIntervalFromMaxSplits(r, baseInterval, maxSplits)
if candidateTotalShards := getExpectedTotalShards(r.GetStart(), r.GetEnd(), candidateInterval, candidateVerticalShardSize); candidateTotalShards > totalShards {
interval = candidateInterval
verticalShardSize = candidateVerticalShardSize
totalShards = candidateTotalShards
}
}
Comment on lines +184 to +205
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change to introduce dynamic vertical sharding. We do the same calculations to determine the split interval, but now inside a loop that goes over vertical sharding from 1 to max.

The best vertical sharding value is the one that is expected to result in the most number of total shards, which is checked by getExpectedTotalShards().


// Set number of vertical shards to be used in shard_by middleware
if isShardable && maxVerticalShardSize > 1 {
ctx = tripperware.InjectVerticalShardSizeToContext(ctx, verticalShardSize)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chosen value for vertical sharding is injected to context here, and then used in shard_by.go middlware

}

interval := getIntervalFromMaxSplits(r, baseInterval, maxSplits)
return interval, nil
return ctx, interval, nil
}
}

func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, error) {
func getMaxVerticalShardSize(ctx context.Context, r tripperware.Request, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer) (int, bool, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return 1, err
return 1, false, err
}

analysis, err := queryAnalyzer.Analyze(r.GetQuery())
if err != nil {
return 1, err
return 1, false, err
}

queryVerticalShardSize := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
if queryVerticalShardSize <= 0 || !analysis.IsShardable() {
queryVerticalShardSize = 1
}
return queryVerticalShardSize, nil
return queryVerticalShardSize, analysis.IsShardable(), nil
}

// Returns the minimum multiple of base interval needed to split query into less than maxSplits
func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplitsInt int) time.Duration {
maxSplits := time.Duration(maxSplitsInt)
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond))
// Calculate the multiple (n) of base interval needed to shard query into <= maxSplits
n := ceilDiv(queryRange, baseInterval*maxSplits)
n := ceilDiv(int64(queryRange), int64(baseInterval*maxSplits))
if n <= 0 {
n = 1
}

// Loop to handle cases where first split is truncated and shorter than remaining splits.
// Exits loop if interval (n) is sufficient after removing first split
// If no suitable interval was found terminates at a maximum of interval = 2 * query range
for n <= 2*ceilDiv(queryRange, baseInterval) {
// Find new start time for query after removing first split
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep()
for n <= 2*ceilDiv(int64(queryRange), int64(baseInterval)) {
// Find the new start time for query after removing first split
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), time.Duration(n)*baseInterval) + r.GetStep()
if maxSplits == 1 {
// If maxSplits == 1, the removed first split should cover the full query range.
if nextSplitStart >= r.GetEnd() {
Expand All @@ -238,7 +255,7 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration,
} else {
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
// Recalculate n for the remaining query range with maxSplits-1.
n_temp := ceilDiv(queryRangeWithoutFirstSplit, baseInterval*(maxSplits-1))
n_temp := ceilDiv(int64(queryRangeWithoutFirstSplit), int64(baseInterval*(maxSplits-1)))
// If a larger interval is needed after removing the first split, the initial n was insufficient.
if n >= n_temp {
break
Expand All @@ -247,11 +264,11 @@ func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration,
// Increment n to check if larger interval fits the maxSplits constraint.
n++
}
return n * baseInterval
return time.Duration(n) * baseInterval
}

// Return max allowed number of splits by MaxShardsPerQuery config after accounting for vertical sharding
func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int) int {
func getMaxSplitsFromMaxQueryShards(maxSplitsConfigValue int, queryVerticalShardSize int) int {
var maxSplitsFromConfig int
if maxSplitsConfigValue > 0 {
maxSplitsFromConfig = maxSplitsConfigValue / queryVerticalShardSize
Expand All @@ -263,75 +280,37 @@ func getMaxSplitsFromConfig(maxSplitsConfigValue int, queryVerticalShardSize int
}

// Return max allowed number of splits by MaxFetchedDataDurationPerQuery config after accounting for vertical sharding
func getMaxSplitsByDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int {
fixedDurationFetched, perSplitDurationFetched := getDurationFetchedByQuerySplitting(expr, queryStart, queryEnd, queryStep, baseInterval, lookbackDelta)
if perSplitDurationFetched == 0 {
return int(maxFetchedDataDurationPerQuery / baseInterval) // Total duration fetched does not increase with number of splits, return default max splits
func getMaxSplitsFromDurationFetched(maxFetchedDataDurationPerQuery time.Duration, queryVerticalShardSize int, expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) int {
durationFetchedByRange, durationFetchedBySelectors := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta)

if durationFetchedBySelectors == 0 {
return int(maxFetchedDataDurationPerQuery / baseInterval) // The total duration fetched does not increase with number of splits, return default max splits
}

var maxSplitsByDurationFetched int
if maxFetchedDataDurationPerQuery > 0 {
// Duration fetched by query after splitting = fixedDurationFetched + perSplitDurationFetched x numOfShards
// Duration fetched by query after splitting = durationFetchedByRange + durationFetchedBySelectors x numOfShards
// Rearranging the equation to find the max horizontal splits after accounting for vertical shards
maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - fixedDurationFetched) / perSplitDurationFetched)
maxSplitsByDurationFetched = int(((maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)) - durationFetchedByRange) / durationFetchedBySelectors)
}
if maxSplitsByDurationFetched <= 0 {
maxSplitsByDurationFetched = 1
}
return maxSplitsByDurationFetched
}

// Return the fixed base duration fetched by the query regardless of the number of splits, and the duration that is fetched once for every split
func getDurationFetchedByQuerySplitting(expr parser.Expr, queryStart int64, queryEnd int64, queryStep int64, baseInterval time.Duration, lookbackDelta time.Duration) (fixedDurationFetched time.Duration, perSplitDurationFetched time.Duration) {
// First analyze the query using original start-end time. Duration fetched by lookbackDelta here only reflects the start time of first split
durationFetchedByRange, durationFetchedBySelectors, durationFetchedByLookbackDeltaFirstSplit := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta)

fixedDurationFetched += durationFetchedByRange // Duration fetched by the query range is constant regardless of how many splits the query has
perSplitDurationFetched += durationFetchedBySelectors // Duration fetched by selectors is fetched once for every query split

// Next analyze the query using the next split start time to find the duration fetched by lookbackDelta for splits other than first one
nextIntervalStart := nextIntervalBoundary(queryStart, queryStep, baseInterval) + queryStep
_, _, durationFetchedByLookbackDeltaOtherSplits := analyzeDurationFetchedByQueryExpr(expr, nextIntervalStart, queryEnd, baseInterval, lookbackDelta)

// Handle different cases for lookbackDelta
if durationFetchedByLookbackDeltaFirstSplit > 0 && durationFetchedByLookbackDeltaOtherSplits > 0 {
// lookbackDelta is fetching additional duration for all splits
perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits
} else if durationFetchedByLookbackDeltaOtherSplits > 0 {
// lookbackDelta is fetching additional duration for all splits except first one
perSplitDurationFetched += durationFetchedByLookbackDeltaOtherSplits
fixedDurationFetched -= durationFetchedByLookbackDeltaOtherSplits
} else if durationFetchedByLookbackDeltaFirstSplit > 0 {
// lookbackDelta is fetching additional duration for first split only
fixedDurationFetched += durationFetchedByLookbackDeltaFirstSplit
}

return fixedDurationFetched, perSplitDurationFetched
}

Comment on lines -284 to -311
Copy link
Contributor Author

@afhassan afhassan Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simplify the calculation for expected duration fetched by query, we no longer calculate lookbackDelta separate from matrix selectors, which makes this function obsolete.

// analyzeDurationFetchedByQueryExpr analyzes the query to calculate
// the duration of data that will be fetched from storage by different
// parts of the query
//
// Returns:
// - durationFetchedByRange: The total duration fetched by the original start-end
// range of the query.
// - durationFetchedBySelectors: The duration fetched by matrix selectors
// and/or subqueries. This duration will be fetched once by every query split.
// - durationFetchedByLookbackDelta: The duration fetched by lookbackDelta
// for the specified query start time.
// the estimated duration of data that will be fetched from storage
//
// Example:
// Query up[15d:1h] with a range of 30 days, 1 day base split interval, and 5 min lookbackDelta with 00:00 UTC start time
// - durationFetchedByRange = 30 day
// - durationFetchedBySelectors = 15 day
// - durationFetchedByLookbackDelta = 1 day
func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration, durationFetchedByLookbackDelta time.Duration) {
durationFetchedByRangeCount := 0
durationFetchedByLookbackDeltaCount := 0
// - durationFetchedBySelectors = 16 day
func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, queryEnd int64, baseInterval time.Duration, lookbackDelta time.Duration) (durationFetchedByRange time.Duration, durationFetchedBySelectors time.Duration) {
baseIntervalMillis := util.DurationMilliseconds(baseInterval)
durationFetchedByRangeCount := 0
durationFetchedBySelectorsCount := 0

totalDurationFetchedCount := 0
var evalRange time.Duration
parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
Expand All @@ -341,27 +320,25 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query
queryEndIntervalIndex := floorDiv(queryEnd, baseIntervalMillis)
durationFetchedByRangeCount += int(queryEndIntervalIndex-queryStartIntervalIndex) + 1

// Adjust start and end time based on matrix selectors and/or subquery selector and increment total duration fetched, this excludes lookbackDelta
start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, 0, n, path, evalRange)
startIntervalIndex := floorDiv(start, baseIntervalMillis)
endIntervalIndex := floorDiv(end, baseIntervalMillis)
totalDurationFetchedCount += int(endIntervalIndex-startIntervalIndex) + 1

// Increment duration fetched by lookbackDelta
startLookbackDelta := start - util.DurationMilliseconds(lookbackDelta)
startLookbackDeltaIntervalIndex := floorDiv(startLookbackDelta, baseIntervalMillis)
if evalRange == 0 && startLookbackDeltaIntervalIndex < startIntervalIndex {
durationFetchedByLookbackDeltaCount += int(startIntervalIndex - startLookbackDeltaIntervalIndex)
}
// Adjust start time based on matrix selectors and/or subquery selectors and calculate additional lookback duration fetched
start, end := util.GetTimeRangesForSelector(queryStart, queryEnd, lookbackDelta, n, path, evalRange)
durationFetchedBySelectors := (end - start) - (queryEnd - queryStart)
durationFetchedBySelectorsCount += int(ceilDiv(durationFetchedBySelectors, baseIntervalMillis))

Comment on lines -344 to +327
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also simplified because we no longer need to calculate duration fetched by lookbackDelta separately. It is all included in durationFetchedBySelectors.

evalRange = 0
case *parser.MatrixSelector:
evalRange = n.Range
}
return nil
})

durationFetchedBySelectorsCount := totalDurationFetchedCount - durationFetchedByRangeCount
return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval, time.Duration(durationFetchedByLookbackDeltaCount) * baseInterval
return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval
}

func getExpectedTotalShards(queryStart int64, queryEnd int64, interval time.Duration, verticalShardSize int) int {
queryRange := time.Duration((queryEnd - queryStart) * int64(time.Millisecond))
expectedSplits := int(ceilDiv(int64(queryRange), int64(interval)))
return expectedSplits * verticalShardSize
}

func floorDiv(a, b int64) int64 {
Expand All @@ -371,7 +348,7 @@ func floorDiv(a, b int64) int64 {
return a / b
}

func ceilDiv(a, b time.Duration) time.Duration {
func ceilDiv(a, b int64) int64 {
if a > 0 && a%b != 0 {
return a/b + 1
}
Expand Down
Loading
Loading