Skip to content

Commit e8b8f90

Browse files
justinjung04yeya24
andauthored
Apply bytes limiter to LabelNames and LabelValuesForLabelNames (#6568)
* Add bytes limiter to LabelValuesForLabelNames Signed-off-by: Justin Jung <[email protected]> * Add bytes limiter to LabelNames Signed-off-by: Justin Jung <[email protected]> * Fix bug in MetricsForLabelMatchersStream where bytes limiter overwrote err Signed-off-by: Justin Jung <[email protected]> * changelog Signed-off-by: Justin Jung <[email protected]> * Apply bytes limiter to LabelNames and LabelValues in block store queryable Signed-off-by: Justin Jung <[email protected]> --------- Signed-off-by: Justin Jung <[email protected]> Signed-off-by: Ben Ye <[email protected]> Co-authored-by: Ben Ye <[email protected]>
1 parent 3d7aa9a commit e8b8f90

File tree

3 files changed

+38
-14
lines changed

3 files changed

+38
-14
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
6+
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568
67
* [ENHANCEMENT] Query Frontend: Add a `too_many_tenants` reason label value to `cortex_rejected_queries_total` metric to track the rejected query count due to the # of tenant limits. #6569
78
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
89

pkg/distributor/distributor.go

+29-14
Original file line numberDiff line numberDiff line change
@@ -1188,7 +1188,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
11881188
})
11891189
}
11901190

1191-
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
1191+
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, limiter *limiter.QueryLimiter) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
11921192
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
11931193
"name": labelName,
11941194
"start": from.Unix(),
@@ -1205,7 +1205,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
12051205
return nil, err
12061206
}
12071207

1208-
resps, err := f(ctx, replicationSet, req)
1208+
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
1209+
resps, err := f(ctx, replicationSet, req, queryLimiter)
12091210
if err != nil {
12101211
return nil, err
12111212
}
@@ -1229,20 +1230,23 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
12291230

12301231
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
12311232
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1232-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1233+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
12331234
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12341235
resp, err := client.LabelValues(ctx, req)
12351236
if err != nil {
12361237
return nil, err
12371238
}
1239+
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1240+
return nil, validation.LimitError(err.Error())
1241+
}
12381242
return resp.LabelValues, nil
12391243
})
12401244
}, matchers...)
12411245
}
12421246

12431247
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
12441248
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1245-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1249+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
12461250
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12471251
stream, err := client.LabelValuesStream(ctx, req)
12481252
if err != nil {
@@ -1252,12 +1256,15 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
12521256
allLabelValues := []string{}
12531257
for {
12541258
resp, err := stream.Recv()
1255-
12561259
if err == io.EOF {
12571260
break
12581261
} else if err != nil {
12591262
return nil, err
12601263
}
1264+
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1265+
return nil, validation.LimitError(err.Error())
1266+
}
1267+
12611268
allLabelValues = append(allLabelValues, resp.LabelValues...)
12621269
}
12631270

@@ -1266,7 +1273,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
12661273
}, matchers...)
12671274
}
12681275

1269-
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
1276+
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, limiter *limiter.QueryLimiter) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
12701277
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
12711278
"start": from.Unix(),
12721279
"end": to.Unix(),
@@ -1283,7 +1290,8 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
12831290
return nil, err
12841291
}
12851292

1286-
resps, err := f(ctx, replicationSet, req)
1293+
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
1294+
resps, err := f(ctx, replicationSet, req, queryLimiter)
12871295
if err != nil {
12881296
return nil, err
12891297
}
@@ -1308,7 +1316,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
13081316
}
13091317

13101318
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1311-
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1319+
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
13121320
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13131321
stream, err := client.LabelNamesStream(ctx, req)
13141322
if err != nil {
@@ -1318,12 +1326,15 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
13181326
allLabelNames := []string{}
13191327
for {
13201328
resp, err := stream.Recv()
1321-
13221329
if err == io.EOF {
13231330
break
13241331
} else if err != nil {
13251332
return nil, err
13261333
}
1334+
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1335+
return nil, validation.LimitError(err.Error())
1336+
}
1337+
13271338
allLabelNames = append(allLabelNames, resp.LabelNames...)
13281339
}
13291340

@@ -1334,12 +1345,16 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
13341345

13351346
// LabelNames returns all the label names.
13361347
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1337-
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1348+
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest, queryLimiter *limiter.QueryLimiter) ([]interface{}, error) {
13381349
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13391350
resp, err := client.LabelNames(ctx, req)
13401351
if err != nil {
13411352
return nil, err
13421353
}
1354+
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1355+
return nil, validation.LimitError(err.Error())
1356+
}
1357+
13431358
return resp.LabelNames, nil
13441359
})
13451360
}, matchers...)
@@ -1385,15 +1400,15 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
13851400
defer stream.CloseSend() //nolint:errcheck
13861401
for {
13871402
resp, err := stream.Recv()
1388-
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1389-
return nil, validation.LimitError(err.Error())
1390-
}
1391-
13921403
if err == io.EOF {
13931404
break
13941405
} else if err != nil {
13951406
return nil, err
13961407
}
1408+
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
1409+
return nil, validation.LimitError(err.Error())
1410+
}
1411+
13971412
s := make([][]cortexpb.LabelAdapter, 0, len(resp.Metric))
13981413
for _, metric := range resp.Metric {
13991414
m := cortexpb.FromLabelAdaptersToMetricWithCopy(metric.Labels)

pkg/querier/blocks_store_queryable.go

+8
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
854854
spanLog = spanlogger.FromContext(ctx)
855855
merrMtx = sync.Mutex{}
856856
merr = multierror.MultiError{}
857+
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
857858
)
858859

859860
// Concurrently fetch series from all clients.
@@ -894,6 +895,9 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
894895
}
895896
return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress())
896897
}
898+
if dataBytesLimitErr := queryLimiter.AddDataBytes(namesResp.Size()); dataBytesLimitErr != nil {
899+
return validation.LimitError(dataBytesLimitErr.Error())
900+
}
897901

898902
myQueriedBlocks := []ulid.ULID(nil)
899903
if namesResp.Hints != nil {
@@ -957,6 +961,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
957961
spanLog = spanlogger.FromContext(ctx)
958962
merrMtx = sync.Mutex{}
959963
merr = multierror.MultiError{}
964+
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
960965
)
961966

962967
// Concurrently fetch series from all clients.
@@ -997,6 +1002,9 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
9971002
}
9981003
return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress())
9991004
}
1005+
if dataBytesLimitErr := queryLimiter.AddDataBytes(valuesResp.Size()); dataBytesLimitErr != nil {
1006+
return validation.LimitError(dataBytesLimitErr.Error())
1007+
}
10001008

10011009
myQueriedBlocks := []ulid.ULID(nil)
10021010
if valuesResp.Hints != nil {

0 commit comments

Comments
 (0)