Skip to content

Add running request gauge metric #604

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ type RequestContext struct {
ResponseSize int
ResponseComplete bool
ResponseStatusCode string
RequestRunning bool

RequestState StreamRequestState
modelServerStreaming bool
Expand Down
9 changes: 7 additions & 2 deletions pkg/epp/handlers/streamingserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
// error metrics. This doesn't cover the error "Cannot receive stream request" because
// such errors might happen even though response is processed.
var err error
defer func(error) {
defer func(error, *RequestContext) {
if reqCtx.ResponseStatusCode != "" {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseStatusCode)
} else if err != nil {
metrics.RecordRequestErrCounter(reqCtx.Model, reqCtx.ResolvedTargetModel, errutil.CanonicalCode(err))
}
}(err)
if reqCtx.RequestRunning {
metrics.DecRunningRequests(reqCtx.Model)
}
}(err, reqCtx)

for {
select {
Expand Down Expand Up @@ -269,6 +272,8 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
r.RequestState = BodyRequestResponsesComplete
metrics.IncRunningRequests(r.Model)
r.RequestRunning = true
// Dump the response so a new stream message can begin
r.reqBodyResp = nil
}
Expand Down
25 changes: 25 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ var (
[]string{"model_name", "target_model_name"},
)

runningRequests = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Subsystem: InferenceModelComponent,
Name: "running_requests",
Help: "Inference model number of running requests in each model.",
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"model_name"},
)

// Inference Pool Metrics
inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Expand Down Expand Up @@ -155,6 +165,7 @@ func Register() {
legacyregistry.MustRegister(responseSizes)
legacyregistry.MustRegister(inputTokens)
legacyregistry.MustRegister(outputTokens)
legacyregistry.MustRegister(runningRequests)

legacyregistry.MustRegister(inferencePoolAvgKVCache)
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
Expand Down Expand Up @@ -209,6 +220,20 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
}
}

// IncRunningRequests increases the current running requests.
func IncRunningRequests(modelName string) {
if modelName != "" {
runningRequests.WithLabelValues(modelName).Inc()
}
}

// DecRunningRequests decreases the current running requests.
func DecRunningRequests(modelName string) {
if modelName != "" {
runningRequests.WithLabelValues(modelName).Dec()
}
}

func RecordInferencePoolAvgKVCache(name string, utilization float64) {
inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
}
Expand Down
61 changes: 61 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
InputTokensMetric = InferenceModelComponent + "_input_tokens"
OutputTokensMetric = InferenceModelComponent + "_output_tokens"
RunningRequestsMetric = InferenceModelComponent + "_running_requests"
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
)
Expand Down Expand Up @@ -345,6 +346,66 @@ func TestRecordResponseMetrics(t *testing.T) {
}
}

func TestRunningRequestsMetrics(t *testing.T) {
type request struct {
modelName string
complete bool // true -> request is completed, false -> running request
}

scenarios := []struct {
name string
requests []request
}{
{
name: "basic test",
requests: []request{
{
modelName: "m1",
complete: false,
},
{
modelName: "m1",
complete: false,
},
{
modelName: "m1",
complete: true,
},
{
modelName: "m2",
complete: false,
},
},
},
}

Register()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
for _, req := range scenario.requests {
if req.complete {
DecRunningRequests(req.modelName)
} else {
IncRunningRequests(req.modelName)
}
}

wantRunningRequests, err := os.Open("testdata/running_requests_metrics")
defer func() {
if err := wantRunningRequests.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRunningRequests, RunningRequestsMetric); err != nil {
t.Error(err)
}
})
}
}

func TestInferencePoolMetrics(t *testing.T) {
scenarios := []struct {
name string
Expand Down
4 changes: 4 additions & 0 deletions pkg/epp/metrics/testdata/running_requests_metrics
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# HELP inference_model_running_requests [ALPHA] Inference model number of running requests in each model.
# TYPE inference_model_running_requests gauge
inference_model_running_requests{model_name="m1"} 1
inference_model_running_requests{model_name="m2"} 1
1 change: 1 addition & 0 deletions site-src/guides/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_model_running_requests | Gauge | Number of running requests for each model. | `model_name`=&lt;model-name&gt; | ALPHA |
| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=&lt;inference-pool-name&gt; | ALPHA |

Expand Down