Skip to content

Commit 5b4c6d8

Browse files
committed
Add native histogram for tracking push requests size
Signed-off-by: SungJin1212 <[email protected]>
1 parent 7191ecb commit 5b4c6d8

File tree

11 files changed

+145
-26
lines changed

11 files changed

+145
-26
lines changed

Diff for: CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
1919
* [FEATURE] Ruler: Add support for per-user external labels #6340
2020
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
21+
* [ENHANCEMENT] Distributor: Add a `cortex_distributor_push_requests_uncompressed_size_bytes` native histogram to track uncompressed push requests in bytes for tenant and format. #6384
2122
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
2223
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
2324
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333

Diff for: pkg/api/api.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
277277
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
278278
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
279279

280-
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281-
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280+
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
281+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
282282

283283
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
284284
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
@@ -289,7 +289,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
289289
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")
290290

291291
// Legacy Routes
292-
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
292+
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d), d.PushHandlerMetrics), true, "POST")
293293
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
294294
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
295295
}
@@ -322,12 +322,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {
322322
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
323323
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
324324
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
325-
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
325+
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
326326

327327
// Legacy Routes
328328
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
329329
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
330-
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
330+
a.RegisterRoute("/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
331331
}
332332

333333
func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {

Diff for: pkg/distributor/distributor.go

+33-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ type Distributor struct {
123123
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
124124

125125
validateMetrics *validation.ValidateMetrics
126+
127+
// metrics passed to push handler
128+
PushHandlerMetrics *PushHandlerMetrics
126129
}
127130

128131
// Config contains the configuration required to
@@ -167,6 +170,32 @@ type Config struct {
167170
OTLPConfig OTLPConfig `yaml:"otlp"`
168171
}
169172

173+
type PushHandlerMetrics struct {
174+
pushRequestSizeBytes *prometheus.HistogramVec
175+
}
176+
177+
func NewPushHandlerMetrics(reg prometheus.Registerer) *PushHandlerMetrics {
178+
return &PushHandlerMetrics{
179+
pushRequestSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
180+
Name: "cortex_distributor_push_requests_uncompressed_size_bytes",
181+
Help: "Histogram of push request's uncompressed size in bytes",
182+
NativeHistogramBucketFactor: 1.1,
183+
NativeHistogramMinResetDuration: 1 * time.Hour,
184+
NativeHistogramMaxBucketNumber: 100,
185+
}, []string{"user", "format"}),
186+
}
187+
}
188+
189+
func (m *PushHandlerMetrics) ObservePushRequestSize(user, format string, size float64) {
190+
if m != nil {
191+
m.pushRequestSizeBytes.WithLabelValues(user, format).Observe(size)
192+
}
193+
}
194+
195+
func (m *PushHandlerMetrics) deleteUserMetrics(user string) {
196+
m.pushRequestSizeBytes.DeleteLabelValues(user)
197+
}
198+
170199
type InstanceLimits struct {
171200
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
172201
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
@@ -365,7 +394,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
365394
Help: "Unix timestamp of latest received sample per user.",
366395
}, []string{"user"}),
367396

368-
validateMetrics: validation.NewValidateMetrics(reg),
397+
validateMetrics: validation.NewValidateMetrics(reg),
398+
PushHandlerMetrics: NewPushHandlerMetrics(reg),
369399
}
370400

371401
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -468,6 +498,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
468498
d.nonHASamples.DeleteLabelValues(userID)
469499
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)
470500

501+
d.PushHandlerMetrics.deleteUserMetrics(userID)
502+
471503
if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil {
472504
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
473505
}

Diff for: pkg/ingester/client/client_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ func TestMarshall(t *testing.T) {
4343
plentySize = 1024 * 1024
4444
)
4545
req := cortexpb.WriteRequest{}
46-
err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
46+
_, err := util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), tooSmallSize, &req, util.RawSnappy)
4747
require.Error(t, err)
48-
err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
48+
_, err = util.ParseProtoReader(context.Background(), recorder.Body, recorder.Body.Len(), plentySize, &req, util.RawSnappy)
4949
require.NoError(t, err)
5050
require.Equal(t, numSeries, len(req.Timeseries))
5151
}

Diff for: pkg/querier/remote_read.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler {
2121
ctx := r.Context()
2222
var req client.ReadRequest
2323
logger := util_log.WithContext(r.Context(), logger)
24-
if err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
24+
if _, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRemoteReadQuerySize, &req, util.RawSnappy); err != nil {
2525
level.Error(logger).Log("msg", "failed to parse proto", "err", err.Error())
2626
http.Error(w, err.Error(), http.StatusBadRequest)
2727
return

Diff for: pkg/util/http.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ const (
148148
)
149149

150150
// ParseProtoReader parses a compressed proto from an io.Reader.
151-
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error {
151+
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) (int, error) {
152152
sp := opentracing.SpanFromContext(ctx)
153153
if sp != nil {
154154
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
155155
}
156156
body, err := decompressRequest(reader, expectedSize, maxSize, compression, sp)
157157
if err != nil {
158-
return err
158+
return 0, err
159159
}
160160

161161
if sp != nil {
@@ -171,10 +171,10 @@ func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSi
171171
err = proto.NewBuffer(body).Unmarshal(req)
172172
}
173173
if err != nil {
174-
return err
174+
return 0, err
175175
}
176176

177-
return nil
177+
return len(body), nil
178178
}
179179

180180
func decompressRequest(reader io.Reader, expectedSize, maxSize int, compression CompressionType, sp opentracing.Span) (body []byte, err error) {

Diff for: pkg/util/http_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func TestParseProtoReader(t *testing.T) {
193193
reader = bytesBuffered{Buffer: &buf}
194194
}
195195

196-
err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
196+
_, err := util.ParseProtoReader(context.Background(), reader, 0, tt.maxSize, &fromWire, tt.compression)
197197
if tt.expectErr {
198198
assert.NotNil(t, err)
199199
return

Diff for: pkg/util/push/otlp.go

+19-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const (
3434
)
3535

3636
// OTLPHandler is a http.Handler which accepts OTLP metrics.
37-
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
37+
func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func, metrics *distributor.PushHandlerMetrics) http.Handler {
3838
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3939
ctx := r.Context()
4040
logger := util_log.WithContext(ctx, util_log.Logger)
@@ -51,7 +51,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
5151
return
5252
}
5353

54-
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize)
54+
req, err := decodeOTLPWriteRequest(ctx, r, maxRecvMsgSize, userID, metrics)
5555
if err != nil {
5656
level.Error(logger).Log("err", err.Error())
5757
http.Error(w, err.Error(), http.StatusBadRequest)
@@ -99,7 +99,7 @@ func OTLPHandler(maxRecvMsgSize int, overrides *validation.Overrides, cfg distri
9999
})
100100
}
101101

102-
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (pmetricotlp.ExportRequest, error) {
102+
func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int, userID string, metrics *distributor.PushHandlerMetrics) (pmetricotlp.ExportRequest, error) {
103103
expectedSize := int(r.ContentLength)
104104
if expectedSize > maxSize {
105105
return pmetricotlp.NewExportRequest(), fmt.Errorf("received message larger than max (%d vs %d)", expectedSize, maxSize)
@@ -124,7 +124,17 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
124124
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
125125
req := pmetricotlp.NewExportRequest()
126126
otlpReqProto := otlpProtoMessage{req: &req}
127-
return req, util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
127+
128+
bodySize, err := util.ParseProtoReader(ctx, reader, expectedSize, maxSize, otlpReqProto, compressionType)
129+
if err != nil {
130+
return req, err
131+
}
132+
133+
if metrics != nil {
134+
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
135+
}
136+
137+
return req, nil
128138
}
129139
case jsonContentType:
130140
decoderFunc = func(reader io.Reader) (pmetricotlp.ExportRequest, error) {
@@ -143,11 +153,15 @@ func decodeOTLPWriteRequest(ctx context.Context, r *http.Request, maxSize int) (
143153
if expectedSize > 0 {
144154
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
145155
}
146-
_, err := buf.ReadFrom(reader)
156+
bodySize, err := buf.ReadFrom(reader)
147157
if err != nil {
148158
return req, err
149159
}
150160

161+
if metrics != nil {
162+
metrics.ObservePushRequestSize(userID, formatOTLP, float64(bodySize))
163+
}
164+
151165
return req, req.UnmarshalJSON(buf.Bytes())
152166
}
153167
default:

Diff for: pkg/util/push/otlp_test.go

+38-1
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import (
77
"io"
88
"net/http"
99
"net/http/httptest"
10+
"strings"
1011
"testing"
1112
"time"
1213

1314
"github.com/go-kit/log"
15+
"github.com/prometheus/client_golang/prometheus"
16+
"github.com/prometheus/client_golang/prometheus/testutil"
1417
"github.com/prometheus/prometheus/prompb"
1518
"github.com/stretchr/testify/assert"
1619
"github.com/stretchr/testify/require"
@@ -236,12 +239,20 @@ func TestOTLPWriteHandler(t *testing.T) {
236239
expectedErrMsg string
237240
gzipCompression bool
238241
encodingType string
242+
expectedMetrics string
239243
}{
240244
{
241245
description: "Test proto format write with no compression",
242246
maxRecvMsgSize: 10000,
243247
format: pbContentType,
244248
expectedStatusCode: http.StatusOK,
249+
expectedMetrics: `
250+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
251+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
252+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
253+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
254+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
255+
`,
245256
},
246257
{
247258
description: "Test proto format write with gzip",
@@ -250,12 +261,26 @@ func TestOTLPWriteHandler(t *testing.T) {
250261
expectedStatusCode: http.StatusOK,
251262
encodingType: "gzip",
252263
gzipCompression: true,
264+
expectedMetrics: `
265+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
266+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
267+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
268+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 665
269+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
270+
`,
253271
},
254272
{
255273
description: "Test json format write with no compression",
256274
maxRecvMsgSize: 10000,
257275
format: jsonContentType,
258276
expectedStatusCode: http.StatusOK,
277+
expectedMetrics: `
278+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
279+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
280+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
281+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
282+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
283+
`,
259284
},
260285
{
261286
description: "Test json format write with gzip",
@@ -264,6 +289,13 @@ func TestOTLPWriteHandler(t *testing.T) {
264289
expectedStatusCode: http.StatusOK,
265290
encodingType: "gzip",
266291
gzipCompression: true,
292+
expectedMetrics: `
293+
# HELP cortex_distributor_push_requests_uncompressed_size_bytes Histogram of push request's uncompressed size in bytes
294+
# TYPE cortex_distributor_push_requests_uncompressed_size_bytes histogram
295+
cortex_distributor_push_requests_uncompressed_size_bytes_bucket{format="otlp",user="user-1",le="+Inf"} 1
296+
cortex_distributor_push_requests_uncompressed_size_bytes_sum{format="otlp",user="user-1"} 1568
297+
cortex_distributor_push_requests_uncompressed_size_bytes_count{format="otlp",user="user-1"} 1
298+
`,
267299
},
268300
{
269301
description: "request too big than maxRecvMsgSize (proto) with no compression",
@@ -351,14 +383,19 @@ func TestOTLPWriteHandler(t *testing.T) {
351383
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
352384
overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil)
353385
require.NoError(t, err)
354-
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push)
386+
reg := prometheus.NewRegistry()
387+
handler := OTLPHandler(test.maxRecvMsgSize, overrides, cfg, nil, push, distributor.NewPushHandlerMetrics(reg))
355388

356389
recorder := httptest.NewRecorder()
357390
handler.ServeHTTP(recorder, req)
358391

359392
resp := recorder.Result()
360393
require.Equal(t, test.expectedStatusCode, resp.StatusCode)
361394

395+
if test.expectedMetrics != "" {
396+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_distributor_push_requests"))
397+
}
398+
362399
if test.expectedErrMsg != "" {
363400
b, err := io.ReadAll(resp.Body)
364401
require.NoError(t, err)

Diff for: pkg/util/push/push.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,22 @@ import (
99
"github.com/weaveworks/common/middleware"
1010

1111
"github.com/cortexproject/cortex/pkg/cortexpb"
12+
"github.com/cortexproject/cortex/pkg/distributor"
13+
"github.com/cortexproject/cortex/pkg/tenant"
1214
"github.com/cortexproject/cortex/pkg/util"
1315
"github.com/cortexproject/cortex/pkg/util/log"
1416
)
1517

18+
const (
19+
formatRemoteWrite1 = "prw1"
20+
formatOTLP = "otlp"
21+
)
22+
1623
// Func defines the type of the push. It is similar to http.HandlerFunc.
1724
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
1825

1926
// Handler is a http.Handler which accepts WriteRequests.
20-
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
27+
func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push Func, pushMetrics *distributor.PushHandlerMetrics) http.Handler {
2128
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2229
ctx := r.Context()
2330
logger := log.WithContext(ctx, log.Logger)
@@ -28,14 +35,24 @@ func Handler(maxRecvMsgSize int, sourceIPs *middleware.SourceIPExtractor, push F
2835
logger = log.WithSourceIPs(source, logger)
2936
}
3037
}
38+
39+
userID, err := tenant.TenantID(ctx)
40+
if err != nil {
41+
return
42+
}
43+
3144
var req cortexpb.PreallocWriteRequest
32-
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
45+
bodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, &req, util.RawSnappy)
3346
if err != nil {
3447
level.Error(logger).Log("err", err.Error())
3548
http.Error(w, err.Error(), http.StatusBadRequest)
3649
return
3750
}
3851

52+
if pushMetrics != nil {
53+
pushMetrics.ObservePushRequestSize(userID, formatRemoteWrite1, float64(bodySize))
54+
}
55+
3956
req.SkipLabelNameValidation = false
4057
if req.Source == 0 {
4158
req.Source = cortexpb.API

0 commit comments

Comments
 (0)