Skip to content

Commit 4b32f29

Browse files
authored
Fix: expanded postings can cache wrong data when queries are issued "in the future" (#6562)
* improve fuzz test for expanded postings cache Signed-off-by: alanprot <[email protected]> * create more tests on the expanded postings cache Signed-off-by: alanprot <[email protected]> * adding get series call on the test Signed-off-by: alanprot <[email protected]> * no use CachedBlockChunkQuerier when query time range is completely after the last sample added in the head Signed-off-by: alanprot <[email protected]> * adding comments Signed-off-by: alanprot <[email protected]> * increase the number of fuzz test from 100 to 300 Signed-off-by: alanprot <[email protected]> * add get series fuzzy testing Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent 653ea65 commit 4b32f29

File tree

4 files changed

+283
-21
lines changed

4 files changed

+283
-21
lines changed

Diff for: integration/query_fuzz_test.go

+52-15
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
433433
scrapeInterval,
434434
i*numSamples,
435435
numSamples,
436-
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
436+
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
437437
)
438438
ss[i*numberOfLabelsPerSeries+j] = series
439439

@@ -453,11 +453,18 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
453453
ps := promqlsmith.New(rnd, lbls, opts...)
454454

455455
// Create the queries with the original labels
456-
testRun := 100
456+
testRun := 300
457457
queries := make([]string, testRun)
458+
matchers := make([]string, testRun)
458459
for i := 0; i < testRun; i++ {
459460
expr := ps.WalkRangeQuery()
460461
queries[i] = expr.Pretty(0)
462+
matchers[i] = storepb.PromMatchersToString(
463+
append(
464+
ps.WalkSelectors(),
465+
labels.MustNewMatcher(labels.MatchEqual, "__name__", fmt.Sprintf("test_series_%d", i%numSeries)),
466+
)...,
467+
)
461468
}
462469

463470
// Lets run multiples iterations and create new series every iteration
@@ -472,7 +479,7 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
472479
scrapeInterval,
473480
i*numSamples,
474481
numSamples,
475-
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
482+
prompb.Label{Name: "test_label", Value: fmt.Sprintf("test_label_value_%d", j)},
476483
prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)},
477484
)
478485
}
@@ -485,42 +492,72 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
485492
}
486493

487494
type testCase struct {
488-
query string
489-
res1, res2 model.Value
490-
err1, err2 error
495+
query string
496+
qt string
497+
res1, res2 model.Value
498+
sres1, sres2 []model.LabelSet
499+
err1, err2 error
491500
}
492501

493-
queryStart := time.Now().Add(-time.Hour * 24)
494-
queryEnd := time.Now()
495-
cases := make([]*testCase, 0, 200)
502+
cases := make([]*testCase, 0, len(queries)*3)
496503

497504
for _, query := range queries {
498-
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
499-
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
505+
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
506+
queryEnd := start.Add(fuzzyTime * time.Millisecond)
507+
res1, err1 := c1.Query(query, queryEnd)
508+
res2, err2 := c2.Query(query, queryEnd)
509+
cases = append(cases, &testCase{
510+
query: query,
511+
qt: "instant",
512+
res1: res1,
513+
res2: res2,
514+
err1: err1,
515+
err2: err2,
516+
})
517+
res1, err1 = c1.QueryRange(query, start, queryEnd, scrapeInterval)
518+
res2, err2 = c2.QueryRange(query, start, queryEnd, scrapeInterval)
500519
cases = append(cases, &testCase{
501520
query: query,
521+
qt: "range query",
502522
res1: res1,
503523
res2: res2,
504524
err1: err1,
505525
err2: err2,
506526
})
507527
}
508528

529+
for _, m := range matchers {
530+
fuzzyTime := time.Duration(rand.Int63n(time.Now().UnixMilli() - start.UnixMilli()))
531+
queryEnd := start.Add(fuzzyTime * time.Millisecond)
532+
res1, err := c1.Series([]string{m}, start, queryEnd)
533+
require.NoError(t, err)
534+
res2, err := c2.Series([]string{m}, start, queryEnd)
535+
require.NoError(t, err)
536+
cases = append(cases, &testCase{
537+
query: m,
538+
qt: "get series",
539+
sres1: res1,
540+
sres2: res2,
541+
})
542+
}
543+
509544
failures := 0
510545
for i, tc := range cases {
511-
qt := "range query"
512546
if tc.err1 != nil || tc.err2 != nil {
513547
if !cmp.Equal(tc.err1, tc.err2) {
514-
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
548+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, tc.qt, tc.query, tc.err1, tc.err2)
515549
failures++
516550
}
517551
} else if shouldUseSampleNumComparer(tc.query) {
518552
if !cmp.Equal(tc.res1, tc.res2, sampleNumComparer) {
519-
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
553+
t.Logf("case %d # of samples mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
520554
failures++
521555
}
522556
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
523-
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
557+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, tc.qt, tc.query, tc.res1.String(), tc.res2.String())
558+
failures++
559+
} else if !cmp.Equal(tc.sres1, tc.sres1, labelSetsComparer) {
560+
t.Logf("case %d results mismatch.\n%s: %s\nsres1: %s\nsres2: %s\n", i, tc.qt, tc.query, tc.sres1, tc.sres2)
524561
failures++
525562
}
526563
}

Diff for: pkg/ingester/ingester.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -2283,6 +2283,27 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
22832283
return db, nil
22842284
}
22852285

2286+
func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFunc {
2287+
return func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
2288+
db := i.getTSDB(userId)
2289+
2290+
var postingCache cortex_tsdb.ExpandedPostingsCache
2291+
if db != nil {
2292+
postingCache = db.postingCache
2293+
}
2294+
2295+
// Caching expanded postings for queries that are "in the future" may lead to incorrect results being cached.
2296+
// This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios.
2297+
// For more details, see: https://github.com/cortexproject/cortex/issues/6556
2298+
// TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream.
2299+
if postingCache == nil || mint > db.Head().MaxTime() {
2300+
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
2301+
}
2302+
2303+
return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
2304+
}
2305+
}
2306+
22862307
// createTSDB creates a TSDB for a given userID, and returns the created db.
22872308
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
22882309
tsdbPromReg := prometheus.NewRegistry()
@@ -2346,12 +2367,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
23462367
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
23472368
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
23482369
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
2349-
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
2350-
if postingCache != nil {
2351-
return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt)
2352-
}
2353-
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
2354-
},
2370+
BlockChunkQuerierFunc: i.blockChunkQuerierFunc(userID),
23552371
}, nil)
23562372
if err != nil {
23572373
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

Diff for: pkg/ingester/ingester_test.go

+195
Original file line numberDiff line numberDiff line change
@@ -5605,6 +5605,201 @@ func TestExpendedPostingsCacheIsolation(t *testing.T) {
56055605
wg.Wait()
56065606
}
56075607

5608+
func TestExpendedPostingsCacheMatchers(t *testing.T) {
5609+
cfg := defaultIngesterTestConfig(t)
5610+
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second
5611+
cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour}
5612+
cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled = true
5613+
cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled = true
5614+
cfg.QueryIngestersWithin = 24 * time.Hour
5615+
5616+
ctx := user.InjectOrgID(context.Background(), userID)
5617+
5618+
r := prometheus.NewRegistry()
5619+
ing, err := prepareIngesterWithBlocksStorage(t, cfg, r)
5620+
require.NoError(t, err)
5621+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
5622+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
5623+
5624+
// Wait until the ingester is ACTIVE
5625+
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
5626+
return ing.lifecycler.GetState()
5627+
})
5628+
5629+
numberOfMetricNames := 10
5630+
seriesPerMetricsNames := 25
5631+
timeStamp := int64(60 * 1000)
5632+
seriesCreated := map[string]labels.Labels{}
5633+
5634+
for i := 0; i < numberOfMetricNames; i++ {
5635+
metricName := fmt.Sprintf("metric_%v", i)
5636+
for j := 0; j < seriesPerMetricsNames; j++ {
5637+
s := labels.FromStrings(labels.MetricName, metricName, "labelA", fmt.Sprintf("series_%v", j))
5638+
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{s}, []cortexpb.Sample{{Value: 2, TimestampMs: timeStamp}}, nil, nil, cortexpb.API))
5639+
seriesCreated[s.String()] = s
5640+
require.NoError(t, err)
5641+
}
5642+
}
5643+
5644+
db := ing.getTSDB(userID)
5645+
5646+
type testCase struct {
5647+
matchers []*client.LabelMatcher
5648+
}
5649+
5650+
cases := []testCase{}
5651+
5652+
nameMatcher := &client.LabelMatcher{
5653+
Type: client.EQUAL,
5654+
Name: labels.MetricName,
5655+
Value: "metric_0",
5656+
}
5657+
5658+
for i := 0; i < 4; i++ {
5659+
tc := testCase{
5660+
matchers: []*client.LabelMatcher{nameMatcher},
5661+
}
5662+
5663+
switch client.MatchType(i) {
5664+
case client.EQUAL | client.NOT_EQUAL:
5665+
tc.matchers = append(tc.matchers, &client.LabelMatcher{
5666+
Type: client.MatchType(i),
5667+
Name: "labelA",
5668+
Value: "series_0",
5669+
})
5670+
default:
5671+
tc.matchers = append(tc.matchers, &client.LabelMatcher{
5672+
Type: client.MatchType(i),
5673+
Name: "labelA",
5674+
Value: "series_.*",
5675+
})
5676+
}
5677+
cases = append(cases, tc)
5678+
}
5679+
5680+
for _, v := range []string{".*", "", ".+"} {
5681+
cases = append(cases,
5682+
testCase{
5683+
matchers: []*client.LabelMatcher{
5684+
nameMatcher,
5685+
{
5686+
Type: client.REGEX_MATCH,
5687+
Name: "labelA",
5688+
Value: v,
5689+
},
5690+
},
5691+
},
5692+
testCase{
5693+
matchers: []*client.LabelMatcher{
5694+
nameMatcher,
5695+
{
5696+
Type: client.REGEX_NO_MATCH,
5697+
Name: "labelA",
5698+
Value: v,
5699+
},
5700+
},
5701+
},
5702+
)
5703+
}
5704+
5705+
ranges := []struct {
5706+
startTs, endTs int64
5707+
hasSamples bool
5708+
}{
5709+
// Totally in the past
5710+
{
5711+
startTs: 0,
5712+
endTs: timeStamp / 2,
5713+
hasSamples: false,
5714+
},
5715+
{
5716+
startTs: timeStamp / 2,
5717+
endTs: timeStamp,
5718+
hasSamples: true,
5719+
},
5720+
{
5721+
startTs: timeStamp / 2,
5722+
endTs: timeStamp * 2,
5723+
hasSamples: true,
5724+
},
5725+
{
5726+
startTs: timeStamp + 1,
5727+
endTs: timeStamp * 2,
5728+
hasSamples: false,
5729+
},
5730+
}
5731+
5732+
verify := func(t *testing.T, tc testCase, startTs, endTs int64, hasSamples bool) {
5733+
5734+
expectedCount := len(seriesCreated)
5735+
matchers, err := client.FromLabelMatchers(ing.matchersCache, tc.matchers)
5736+
require.NoError(t, err)
5737+
for _, s := range seriesCreated {
5738+
for _, m := range matchers {
5739+
if !m.Matches(s.Get(m.Name)) {
5740+
expectedCount--
5741+
break
5742+
}
5743+
}
5744+
}
5745+
5746+
seriesResponse, err := ing.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{
5747+
StartTimestampMs: startTs,
5748+
EndTimestampMs: endTs,
5749+
MatchersSet: []*client.LabelMatchers{
5750+
{
5751+
Matchers: tc.matchers,
5752+
},
5753+
},
5754+
})
5755+
require.NoError(t, err)
5756+
if hasSamples {
5757+
require.Len(t, seriesResponse.Metric, expectedCount)
5758+
} else {
5759+
require.Len(t, seriesResponse.Metric, 0)
5760+
}
5761+
5762+
s := &mockQueryStreamServer{ctx: ctx}
5763+
err = ing.QueryStream(&client.QueryRequest{
5764+
StartTimestampMs: startTs,
5765+
EndTimestampMs: endTs,
5766+
Matchers: tc.matchers,
5767+
}, s)
5768+
require.NoError(t, err)
5769+
if hasSamples {
5770+
require.Equal(t, expectedCount, len(s.series))
5771+
} else {
5772+
require.Equal(t, 0, len(s.series))
5773+
}
5774+
}
5775+
5776+
for _, tc := range cases {
5777+
testName := ""
5778+
for _, matcher := range tc.matchers {
5779+
t, _ := matcher.MatcherType()
5780+
testName += matcher.Name + t.String() + matcher.Value + "|"
5781+
5782+
}
5783+
t.Run(fmt.Sprintf("%v", testName), func(t *testing.T) {
5784+
for _, r := range ranges {
5785+
t.Run(fmt.Sprintf("start=%v,end=%v", r.startTs, r.endTs), func(t *testing.T) {
5786+
db.postingCache.Clear()
5787+
5788+
// lets run 2 times to hit the cache
5789+
for i := 0; i < 2; i++ {
5790+
verify(t, tc, r.startTs, r.endTs, r.hasSamples)
5791+
}
5792+
5793+
// run the test again with all other ranges
5794+
for _, r1 := range ranges {
5795+
verify(t, tc, r1.startTs, r1.endTs, r1.hasSamples)
5796+
}
5797+
})
5798+
}
5799+
})
5800+
}
5801+
}
5802+
56085803
func TestExpendedPostingsCache(t *testing.T) {
56095804
cfg := defaultIngesterTestConfig(t)
56105805
cfg.BlocksStorageConfig.TSDB.ExpandedCachingExpireInterval = time.Second

0 commit comments

Comments
 (0)