diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go
index cd354c2f5..a92f091c5 100644
--- a/pkg/epp/handlers/server.go
+++ b/pkg/epp/handlers/server.go
@@ -228,6 +228,7 @@ type RequestContext struct {
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
+ RequestRunning bool
RequestState StreamRequestState
modelServerStreaming bool
diff --git a/pkg/epp/handlers/streamingserver.go b/pkg/epp/handlers/streamingserver.go
index d704578ac..874dd734f 100644
--- a/pkg/epp/handlers/streamingserver.go
+++ b/pkg/epp/handlers/streamingserver.go
@@ -81,13 +81,16 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
// error metrics. This doesn't cover the error "Cannot receive stream request" because
// such errors might happen even though response is processed.
var err error
- defer func(error) {
+ defer func(error, *RequestContext) {
if reqCtx.ResponseStatusCode != "" {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
} else if err != nil {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
}
- }(err)
+ if reqCtx.RequestRunning {
+ metrics.DecRunningRequests(reqCtx.Model)
+ }
+ }(err, reqCtx)
for {
select {
@@ -269,6 +272,8 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
r.RequestState = BodyRequestResponsesComplete
+ metrics.IncRunningRequests(r.Model)
+ r.RequestRunning = true
// Dump the response so a new stream message can begin
r.reqBodyResp = nil
}
diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go
index e86ca901e..9ff2bb795 100644
--- a/pkg/epp/metrics/metrics.go
+++ b/pkg/epp/metrics/metrics.go
@@ -121,6 +121,16 @@ var (
[]string{"model_name", "target_model_name"},
)
+ runningRequests = compbasemetrics.NewGaugeVec(
+ &compbasemetrics.GaugeOpts{
+ Subsystem: InferenceModelComponent,
+ Name: "running_requests",
+ Help: "Inference model number of running requests in each model.",
+ StabilityLevel: compbasemetrics.ALPHA,
+ },
+ []string{"model_name"},
+ )
+
// Inference Pool Metrics
inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
@@ -155,6 +165,7 @@ func Register() {
legacyregistry.MustRegister(responseSizes)
legacyregistry.MustRegister(inputTokens)
legacyregistry.MustRegister(outputTokens)
+ legacyregistry.MustRegister(runningRequests)
legacyregistry.MustRegister(inferencePoolAvgKVCache)
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
@@ -209,6 +220,20 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
}
}
+// IncRunningRequests increases the current running requests.
+func IncRunningRequests(modelName string) {
+ if modelName != "" {
+ runningRequests.WithLabelValues(modelName).Inc()
+ }
+}
+
+// DecRunningRequests decreases the current running requests.
+func DecRunningRequests(modelName string) {
+ if modelName != "" {
+ runningRequests.WithLabelValues(modelName).Dec()
+ }
+}
+
func RecordInferencePoolAvgKVCache(name string, utilization float64) {
inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
}
diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go
index c2436bab1..dc4c70444 100644
--- a/pkg/epp/metrics/metrics_test.go
+++ b/pkg/epp/metrics/metrics_test.go
@@ -36,6 +36,7 @@ const (
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
InputTokensMetric = InferenceModelComponent + "_input_tokens"
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
+ RunningRequestsMetric = InferenceModelComponent + "_running_requests"
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
)
@@ -345,6 +346,66 @@ func TestRecordResponseMetrics(t *testing.T) {
}
}
+func TestRunningRequestsMetrics(t *testing.T) {
+ type request struct {
+ modelName string
+ complete bool // true -> request is completed, false -> running request
+ }
+
+ scenarios := []struct {
+ name string
+ requests []request
+ }{
+ {
+ name: "basic test",
+ requests: []request{
+ {
+ modelName: "m1",
+ complete: false,
+ },
+ {
+ modelName: "m1",
+ complete: false,
+ },
+ {
+ modelName: "m1",
+ complete: true,
+ },
+ {
+ modelName: "m2",
+ complete: false,
+ },
+ },
+ },
+ }
+
+ Register()
+ for _, scenario := range scenarios {
+ t.Run(scenario.name, func(t *testing.T) {
+ for _, req := range scenario.requests {
+ if req.complete {
+ DecRunningRequests(req.modelName)
+ } else {
+ IncRunningRequests(req.modelName)
+ }
+ }
+
+ wantRunningRequests, err := os.Open("testdata/running_requests_metrics")
+ defer func() {
+ if err := wantRunningRequests.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRunningRequests, RunningRequestsMetric); err != nil {
+ t.Error(err)
+ }
+ })
+ }
+}
+
func TestInferencePoolMetrics(t *testing.T) {
scenarios := []struct {
name string
diff --git a/pkg/epp/metrics/testdata/running_requests_metrics b/pkg/epp/metrics/testdata/running_requests_metrics
new file mode 100644
index 000000000..a880e4998
--- /dev/null
+++ b/pkg/epp/metrics/testdata/running_requests_metrics
@@ -0,0 +1,4 @@
+# HELP inference_model_running_requests [ALPHA] Inference model number of running requests in each model.
+# TYPE inference_model_running_requests gauge
+inference_model_running_requests{model_name="m1"} 1
+inference_model_running_requests{model_name="m2"} 1
diff --git a/site-src/guides/metrics.md b/site-src/guides/metrics.md
index fca43dd61..d07473075 100644
--- a/site-src/guides/metrics.md
+++ b/site-src/guides/metrics.md
@@ -30,6 +30,7 @@ curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
+| inference_model_running_requests | Gauge | Number of running requests for each model. | `model_name`=<model-name> | ALPHA |
| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=<inference-pool-name> | ALPHA |
| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=<inference-pool-name> | ALPHA |