Skip to content

Commit de5cfe1

Browse files
authored
Query Partial Data (#6526)
* Create partial_data Signed-off-by: Justin Jung <[email protected]> * Fix lazyquery so that warning message is returned Signed-off-by: Justin Jung <[email protected]> * Add QueryPartialData limit Signed-off-by: Justin Jung <[email protected]> * Fix broken mock Signed-off-by: Justin Jung <[email protected]> * Make response with warnings to be not cached Signed-off-by: Justin Jung <[email protected]> * Updated streamingSelect in distributor_queryable Signed-off-by: Justin Jung <[email protected]> * Update query.go Signed-off-by: Justin Jung <[email protected]> * Update replication_set Signed-off-by: Justin Jung <[email protected]> * Lint Signed-off-by: Justin Jung <[email protected]> * Lint again Signed-off-by: Justin Jung <[email protected]> * Generated doc Signed-off-by: Justin Jung <[email protected]> * Changelog Signed-off-by: Justin Jung <[email protected]> * Update config description Signed-off-by: Justin Jung <[email protected]> * Do not remove warnings from seriesSet Signed-off-by: Justin Jung <[email protected]> * Avoid cache only if the warning message contains partial data error Signed-off-by: Justin Jung <[email protected]> * Remove context usage for partial data Signed-off-by: Justin Jung <[email protected]> * Refactor how partial data info is passed + apply to series and label methods as well Signed-off-by: Justin Jung <[email protected]> * Lint + fix tests Signed-off-by: Justin Jung <[email protected]> * Fix build Signed-off-by: Justin Jung <[email protected]> * Create separate config for ruler partial data Signed-off-by: Justin Jung <[email protected]> * Genereta doc Signed-off-by: Justin Jung <[email protected]> * Add more tests Signed-off-by: Justin Jung <[email protected]> * Change error Signed-off-by: Justin Jung <[email protected]> * Fix test Signed-off-by: Justin Jung <[email protected]> * Update changelog Signed-off-by: Justin Jung <[email protected]> * Update changelog Signed-off-by: Justin Jung <[email protected]> * Nit Signed-off-by: Justin Jung <[email protected]> * Nit Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]>
1 parent 02d5157 commit de5cfe1

23 files changed

+453
-175
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## master / unreleased
44

5+
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
6+
57
## 1.19.0 in progress
68

79
* [CHANGE] Deprecate `-blocks-storage.tsdb.wal-compression-enabled` flag (use `blocks-storage.tsdb.wal-compression-type` instead). #6529

Diff for: docs/configuration/config-file-reference.md

+8
Original file line numberDiff line numberDiff line change
@@ -3545,6 +3545,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
35453545
# CLI flag: -frontend.max-queriers-per-tenant
35463546
[max_queriers_per_tenant: <float> | default = 0]
35473547

3548+
# Enable to allow queries to be evaluated with data from a single zone, if other
3549+
# zones are not available.
3550+
[query_partial_data: <boolean> | default = false]
3551+
35483552
# Maximum number of outstanding requests per tenant per request queue (either
35493553
# query frontend or query scheduler); requests beyond this error with HTTP 429.
35503554
# CLI flag: -frontend.max-outstanding-requests-per-tenant
@@ -3605,6 +3609,10 @@ query_rejection:
36053609
# external labels for alerting rules
36063610
[ruler_external_labels: <map of string (labelName) to string (labelValue)> | default = []]
36073611

3612+
# Enable to allow rules to be evaluated with data from a single zone, if other
3613+
# zones are not available.
3614+
[rules_partial_data: <boolean> | default = false]
3615+
36083616
# The default tenant's shard size when the shuffle-sharding strategy is used.
36093617
# Must be set when the store-gateway sharding is enabled with the
36103618
# shuffle-sharding strategy. When this setting is specified in the per-tenant

Diff for: pkg/cortex/modules.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) {
258258
querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer)
259259

260260
// Create a querier queryable and PromQL engine
261-
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger)
261+
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.Overrides.QueryPartialData)
262262

263263
// Use distributor as default MetadataQuerier
264264
t.MetadataQuerier = t.Distributor
@@ -623,7 +623,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
623623
} else {
624624
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
625625
// TODO: Consider wrapping logger to differentiate from querier module logger
626-
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
626+
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.Overrides.RulesPartialData)
627627

628628
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
629629
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)

Diff for: pkg/distributor/distributor.go

+16-16
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,8 @@ func getErrorStatus(err error) string {
11771177
}
11781178

11791179
// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
1180-
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
1181-
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
1180+
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, partialDataEnabled bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
1181+
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
11821182
client, err := d.ingesterPool.GetClientFor(ing.Addr)
11831183
if err != nil {
11841184
return nil, err
@@ -1228,9 +1228,9 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
12281228
}
12291229

12301230
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
1231-
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1231+
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
12321232
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1233-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1233+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12341234
resp, err := client.LabelValues(ctx, req)
12351235
if err != nil {
12361236
return nil, err
@@ -1241,9 +1241,9 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
12411241
}
12421242

12431243
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
1244-
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1244+
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
12451245
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1246-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1246+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12471247
stream, err := client.LabelValuesStream(ctx, req)
12481248
if err != nil {
12491249
return nil, err
@@ -1307,9 +1307,9 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
13071307
return r, nil
13081308
}
13091309

1310-
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1310+
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
13111311
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1312-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1312+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13131313
stream, err := client.LabelNamesStream(ctx, req)
13141314
if err != nil {
13151315
return nil, err
@@ -1333,9 +1333,9 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
13331333
}
13341334

13351335
// LabelNames returns all the label names.
1336-
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1336+
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
13371337
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1338-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1338+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13391339
resp, err := client.LabelNames(ctx, req)
13401340
if err != nil {
13411341
return nil, err
@@ -1346,9 +1346,9 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
13461346
}
13471347

13481348
// MetricsForLabelMatchers gets the metrics that match said matchers
1349-
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1349+
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
13501350
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1351-
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1351+
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13521352
resp, err := client.MetricsForLabelMatchers(ctx, req)
13531353
if err != nil {
13541354
return nil, err
@@ -1375,9 +1375,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
13751375
}, matchers...)
13761376
}
13771377

1378-
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1378+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
13791379
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1380-
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1380+
_, err := d.ForReplicationSet(ctx, rs, false, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13811381
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
13821382
if err != nil {
13831383
return nil, err
@@ -1453,7 +1453,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
14531453

14541454
req := &ingester_client.MetricsMetadataRequest{}
14551455
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
1456-
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1456+
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14571457
return client.MetricsMetadata(ctx, req)
14581458
})
14591459
if err != nil {
@@ -1495,7 +1495,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error
14951495
replicationSet.MaxErrors = 0
14961496

14971497
req := &ingester_client.UserStatsRequest{}
1498-
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1498+
resps, err := d.ForReplicationSet(ctx, replicationSet, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14991499
return client.UserStats(ctx, req)
15001500
})
15011501
if err != nil {

0 commit comments

Comments
 (0)