diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index cd354c2f5..a92f091c5 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -228,6 +228,7 @@ type RequestContext struct { ResponseSize int ResponseComplete bool ResponseStatusCode string + RequestRunning bool RequestState StreamRequestState modelServerStreaming bool diff --git a/pkg/epp/handlers/streamingserver.go b/pkg/epp/handlers/streamingserver.go index d704578ac..874dd734f 100644 --- a/pkg/epp/handlers/streamingserver.go +++ b/pkg/epp/handlers/streamingserver.go @@ -81,13 +81,16 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) // error metrics. This doesn't cover the error "Cannot receive stream request" because // such errors might happen even though response is processed. var err error - defer func(error) { + defer func(error, *RequestContext) { if reqCtx.ResponseStatusCode != "" { metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode) } else if err != nil { metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err)) } - }(err) + if reqCtx.RequestRunning { + metrics.DecRunningRequests(reqCtx.Model) + } + }(err, reqCtx) for { select { @@ -269,6 +272,8 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } r.RequestState = BodyRequestResponsesComplete + metrics.IncRunningRequests(r.Model) + r.RequestRunning = true // Dump the response so a new stream message can begin r.reqBodyResp = nil } diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index e86ca901e..9ff2bb795 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -121,6 +121,16 @@ var ( []string{"model_name", "target_model_name"}, ) + runningRequests = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Subsystem: InferenceModelComponent, + Name: "running_requests", + Help: "Inference model number of running requests in each model.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"model_name"}, + ) + // Inference Pool Metrics inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec( &compbasemetrics.GaugeOpts{ @@ -155,6 +165,7 @@ func Register() { legacyregistry.MustRegister(responseSizes) legacyregistry.MustRegister(inputTokens) legacyregistry.MustRegister(outputTokens) + legacyregistry.MustRegister(runningRequests) legacyregistry.MustRegister(inferencePoolAvgKVCache) legacyregistry.MustRegister(inferencePoolAvgQueueSize) @@ -209,6 +220,20 @@ func RecordOutputTokens(modelName, targetModelName string, size int) { } } +// IncRunningRequests increases the current running requests. +func IncRunningRequests(modelName string) { + if modelName != "" { + runningRequests.WithLabelValues(modelName).Inc() + } +} + +// DecRunningRequests decreases the current running requests. +func DecRunningRequests(modelName string) { + if modelName != "" { + runningRequests.WithLabelValues(modelName).Dec() + } +} + func RecordInferencePoolAvgKVCache(name string, utilization float64) { inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization) } diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index c2436bab1..dc4c70444 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -36,6 +36,7 @@ const ( ResponseSizesMetric = InferenceModelComponent + "_response_sizes" InputTokensMetric = InferenceModelComponent + "_input_tokens" OutputTokensMetric = InferenceModelComponent + "_output_tokens" + RunningRequestsMetric = InferenceModelComponent + "_running_requests" KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" ) @@ -345,6 +346,66 @@ func TestRecordResponseMetrics(t *testing.T) { } } +func TestRunningRequestsMetrics(t *testing.T) { + type request struct { + modelName string + complete bool // true -> request is completed, false -> running request + } + + scenarios := []struct { + name string + requests []request + }{ + { + name: "basic test", + requests: []request{ + { + modelName: "m1", + complete: false, + }, + { + modelName: "m1", + complete: false, + }, + { + modelName: "m1", + complete: true, + }, + { + modelName: "m2", + complete: false, + }, + }, + }, + } + + Register() + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + for _, req := range scenario.requests { + if req.complete { + DecRunningRequests(req.modelName) + } else { + IncRunningRequests(req.modelName) + } + } + + wantRunningRequests, err := os.Open("testdata/running_requests_metrics") + defer func() { + if err := wantRunningRequests.Close(); err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRunningRequests, RunningRequestsMetric); err != nil { + t.Error(err) + } + }) + } +} + func TestInferencePoolMetrics(t *testing.T) { scenarios := []struct { name string diff --git a/pkg/epp/metrics/testdata/running_requests_metrics b/pkg/epp/metrics/testdata/running_requests_metrics new file mode 100644 index 000000000..a880e4998 --- /dev/null +++ b/pkg/epp/metrics/testdata/running_requests_metrics @@ -0,0 +1,4 @@ +# HELP inference_model_running_requests [ALPHA] Inference model number of running requests in each model. +# TYPE inference_model_running_requests gauge +inference_model_running_requests{model_name="m1"} 1 +inference_model_running_requests{model_name="m2"} 1 diff --git a/site-src/guides/metrics.md b/site-src/guides/metrics.md index fca43dd61..d07473075 100644 --- a/site-src/guides/metrics.md +++ b/site-src/guides/metrics.md @@ -30,6 +30,7 @@ curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{ | inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | +| inference_model_running_requests | Gauge | Number of running requests for each model. | `model_name`=<model-name> | ALPHA | | inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=<inference-pool-name> | ALPHA | | inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=<inference-pool-name> | ALPHA |