Skip to content

Commit dea6dcf

Browse files
authored
ingester: Fix regression on usage of cortex_ingester_queried_chunks (#6398)
* ingester: Fix regression on usage of cortex_ingester_queried_chunks Fixes: #6396 Signed-off-by: Charlie Le <[email protected]> * update changelog Signed-off-by: Charlie Le <[email protected]> --------- Signed-off-by: Charlie Le <[email protected]>
1 parent 320e475 commit dea6dcf

File tree

3 files changed

+36
-13
lines changed

3 files changed

+36
-13
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
4949
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
5050
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
51+
* [BUGFIX] ingester: Fix regression on usage of cortex_ingester_queried_chunks #6398
5152

5253
## 1.18.1 2024-10-14
5354

Diff for: pkg/ingester/ingester.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -1968,18 +1968,21 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
19681968
numSamples := 0
19691969
numSeries := 0
19701970
totalDataBytes := 0
1971-
numSeries, numSamples, totalDataBytes, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
1971+
numChunks := 0
1972+
numSeries, numSamples, totalDataBytes, numChunks, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
19721973

19731974
if err != nil {
19741975
return err
19751976
}
19761977

19771978
i.metrics.queriedSeries.Observe(float64(numSeries))
19781979
i.metrics.queriedSamples.Observe(float64(numSamples))
1979-
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes)
1980+
i.metrics.queriedChunks.Observe(float64(numChunks))
1981+
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes, "chunks", numChunks)
19801982
spanlog.SetTag("series", numSeries)
19811983
spanlog.SetTag("samples", numSamples)
19821984
spanlog.SetTag("data_bytes", totalDataBytes)
1985+
spanlog.SetTag("chunks", numChunks)
19831986
return nil
19841987
}
19851988

@@ -1998,16 +2001,16 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
19982001
}
19992002

20002003
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
2001-
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
2004+
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes, numChunks int, _ error) {
20022005
q, err := db.ChunkQuerier(from, through)
20032006
if err != nil {
2004-
return 0, 0, 0, err
2007+
return 0, 0, 0, 0, err
20052008
}
20062009
defer q.Close()
20072010

20082011
c, err := i.trackInflightQueryRequest()
20092012
if err != nil {
2010-
return 0, 0, 0, err
2013+
return 0, 0, 0, 0, err
20112014
}
20122015
hints := &storage.SelectHints{
20132016
Start: from,
@@ -2018,7 +2021,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20182021
ss := q.Select(ctx, false, hints, matchers...)
20192022
c()
20202023
if ss.Err() != nil {
2021-
return 0, 0, 0, ss.Err()
2024+
return 0, 0, 0, 0, ss.Err()
20222025
}
20232026

20242027
chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
@@ -2044,7 +2047,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20442047
// It is not guaranteed that chunk returned by iterator is populated.
20452048
// For now just return error. We could also try to figure out how to read the chunk.
20462049
if meta.Chunk == nil {
2047-
return 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
2050+
return 0, 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
20482051
}
20492052

20502053
ch := client.Chunk{
@@ -2061,10 +2064,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20612064
case chunkenc.EncFloatHistogram:
20622065
ch.Encoding = int32(encoding.PrometheusFloatHistogramChunk)
20632066
default:
2064-
return 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
2067+
return 0, 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
20652068
}
20662069

20672070
ts.Chunks = append(ts.Chunks, ch)
2071+
numChunks++
20682072
numSamples += meta.Chunk.NumSamples()
20692073
}
20702074
numSeries++
@@ -2078,7 +2082,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20782082
Chunkseries: chunkSeries,
20792083
})
20802084
if err != nil {
2081-
return 0, 0, 0, err
2085+
return 0, 0, 0, 0, err
20822086
}
20832087

20842088
batchSizeBytes = 0
@@ -2091,7 +2095,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
20912095

20922096
// Ensure no error occurred while iterating the series set.
20932097
if err := ss.Err(); err != nil {
2094-
return 0, 0, 0, err
2098+
return 0, 0, 0, 0, err
20952099
}
20962100

20972101
// Final flush any existing metrics
@@ -2100,11 +2104,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
21002104
Chunkseries: chunkSeries,
21012105
})
21022106
if err != nil {
2103-
return 0, 0, 0, err
2107+
return 0, 0, 0, 0, err
21042108
}
21052109
}
21062110

2107-
return numSeries, numSamples, totalBatchSizeBytes, nil
2111+
return numSeries, numSamples, totalBatchSizeBytes, numChunks, nil
21082112
}
21092113

21102114
func (i *Ingester) getTSDB(userID string) *userTSDB {

Diff for: pkg/ingester/ingester_test.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -3087,7 +3087,8 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
30873087
// Create ingester.
30883088
cfg := defaultIngesterTestConfig(t)
30893089

3090-
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
3090+
reg := prometheus.NewRegistry()
3091+
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
30913092
require.NoError(t, err)
30923093
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
30933094
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -3154,6 +3155,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
31543155
recvMsgs := 0
31553156
series := 0
31563157
totalSamples := 0
3158+
totalChunks := 0
31573159

31583160
for {
31593161
resp, err := s.Recv()
@@ -3174,6 +3176,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
31743176
require.NoError(t, err)
31753177
totalSamples += chk.NumSamples()
31763178
}
3179+
totalChunks += len(ts.Chunks)
31773180
}
31783181
}
31793182

@@ -3183,6 +3186,21 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
31833186
require.True(t, 2 <= recvMsgs && recvMsgs <= 3)
31843187
require.Equal(t, 3, series)
31853188
require.Equal(t, 100000+500000+samplesCount, totalSamples)
3189+
require.Equal(t, 13335, totalChunks)
3190+
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
3191+
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
3192+
# TYPE cortex_ingester_queried_chunks histogram
3193+
cortex_ingester_queried_chunks_bucket{le="10"} 0
3194+
cortex_ingester_queried_chunks_bucket{le="80"} 0
3195+
cortex_ingester_queried_chunks_bucket{le="640"} 0
3196+
cortex_ingester_queried_chunks_bucket{le="5120"} 0
3197+
cortex_ingester_queried_chunks_bucket{le="40960"} 1
3198+
cortex_ingester_queried_chunks_bucket{le="327680"} 1
3199+
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 1
3200+
cortex_ingester_queried_chunks_bucket{le="+Inf"} 1
3201+
cortex_ingester_queried_chunks_sum 13335
3202+
cortex_ingester_queried_chunks_count 1
3203+
`), `cortex_ingester_queried_chunks`))
31863204
}
31873205

31883206
func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *cortexpb.WriteRequest {

0 commit comments

Comments
 (0)