Skip to content

Commit 54ed856

Browse files
authored
feat: Add metric that records length of queue for each model server pods (kubernetes-sigs#776)
This metric is helpful to determine if the scheduling algorithm is working as expected to load the traffic to each pod. New metric uses the prometheus collector because the pod list will change each time the underlying model server deployment changes and will cause the metric cardinality to grow.
1 parent 0ae7d1d commit 54ed856

File tree

9 files changed

+1296
-1021
lines changed

9 files changed

+1296
-1021
lines changed

cmd/epp/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
44+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4445
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4546
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4647
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -199,7 +200,7 @@ func run() error {
199200
}
200201

201202
// Register metrics handler.
202-
if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil {
203+
if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil {
203204
return err
204205
}
205206

@@ -247,8 +248,9 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore.
247248
}
248249

249250
// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager.
250-
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error {
251+
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config, ds datastore.Datastore) error {
251252
metrics.Register()
253+
legacyregistry.CustomMustRegister(collectors.NewInferencePoolMetricsCollector(ds))
252254

253255
metrics.RecordInferenceExtensionInfo()
254256

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package collectors
18+
19+
import (
20+
"k8s.io/component-base/metrics"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
22+
)
23+
24+
var (
25+
descInferencePoolPerPodQueueSize = metrics.NewDesc(
26+
"inference_pool_per_pod_queue_size",
27+
"The total number of requests pending in the model server queue for each underlying pod.",
28+
[]string{
29+
"name",
30+
"model_server_pod",
31+
}, nil,
32+
metrics.ALPHA,
33+
"",
34+
)
35+
)
36+
37+
type inferencePoolMetricsCollector struct {
38+
metrics.BaseStableCollector
39+
40+
ds datastore.Datastore
41+
}
42+
43+
// Check if inferencePoolMetricsCollector implements necessary interface
44+
var _ metrics.StableCollector = &inferencePoolMetricsCollector{}
45+
46+
// NewInferencePoolMetricsCollector implements the metrics.StableCollector interface and
47+
// exposes metrics about inference pool.
48+
func NewInferencePoolMetricsCollector(ds datastore.Datastore) metrics.StableCollector {
49+
return &inferencePoolMetricsCollector{
50+
ds: ds,
51+
}
52+
}
53+
54+
// DescribeWithStability implements the metrics.StableCollector interface.
55+
func (c *inferencePoolMetricsCollector) DescribeWithStability(ch chan<- *metrics.Desc) {
56+
ch <- descInferencePoolPerPodQueueSize
57+
}
58+
59+
// CollectWithStability implements the metrics.StableCollector interface.
60+
func (c *inferencePoolMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) {
61+
pool, err := c.ds.PoolGet()
62+
if err != nil {
63+
return
64+
}
65+
66+
podMetrics := c.ds.PodGetAll()
67+
if len(podMetrics) == 0 {
68+
return
69+
}
70+
71+
for _, pod := range podMetrics {
72+
ch <- metrics.NewLazyConstMetric(
73+
descInferencePoolPerPodQueueSize,
74+
metrics.GaugeValue,
75+
float64(pod.GetMetrics().WaitingQueueSize),
76+
pool.Name,
77+
pod.GetPod().NamespacedName.Name,
78+
)
79+
}
80+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package collectors
18+
19+
import (
20+
"context"
21+
"strings"
22+
"testing"
23+
"time"
24+
25+
corev1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/types"
29+
"k8s.io/component-base/metrics/testutil"
30+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
31+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
32+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
34+
)
35+
36+
var (
37+
pod1 = &corev1.Pod{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: "pod1",
40+
},
41+
}
42+
pod1NamespacedName = types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
43+
pod1Metrics = &backendmetrics.Metrics{
44+
WaitingQueueSize: 100,
45+
KVCacheUsagePercent: 0.2,
46+
MaxActiveModels: 2,
47+
}
48+
)
49+
50+
func TestNoMetricsCollected(t *testing.T) {
51+
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
52+
datastore := datastore.NewDatastore(context.Background(), pmf)
53+
54+
collector := &inferencePoolMetricsCollector{
55+
ds: datastore,
56+
}
57+
58+
if err := testutil.CustomCollectAndCompare(collector, strings.NewReader(""), ""); err != nil {
59+
t.Fatal(err)
60+
}
61+
}
62+
63+
func TestMetricsCollected(t *testing.T) {
64+
pmc := &backendmetrics.FakePodMetricsClient{
65+
Res: map[types.NamespacedName]*backendmetrics.Metrics{
66+
pod1NamespacedName: pod1Metrics,
67+
},
68+
}
69+
pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond)
70+
ds := datastore.NewDatastore(context.Background(), pmf)
71+
72+
scheme := runtime.NewScheme()
73+
fakeClient := fake.NewClientBuilder().
74+
WithScheme(scheme).
75+
Build()
76+
77+
inferencePool := &v1alpha2.InferencePool{
78+
ObjectMeta: metav1.ObjectMeta{
79+
Name: "test-pool",
80+
},
81+
Spec: v1alpha2.InferencePoolSpec{
82+
TargetPortNumber: 8000,
83+
},
84+
}
85+
_ = ds.PoolSet(context.Background(), fakeClient, inferencePool)
86+
_ = ds.PodUpdateOrAddIfNotExist(pod1)
87+
88+
time.Sleep(1 * time.Second)
89+
90+
collector := &inferencePoolMetricsCollector{
91+
ds: ds,
92+
}
93+
err := testutil.CustomCollectAndCompare(collector, strings.NewReader(`
94+
# HELP inference_pool_per_pod_queue_size [ALPHA] The total number of requests pending in the model server queue for each underlying pod.
95+
# TYPE inference_pool_per_pod_queue_size gauge
96+
inference_pool_per_pod_queue_size{model_server_pod="pod1",name="test-pool"} 100
97+
`), "inference_pool_per_pod_queue_size")
98+
if err != nil {
99+
t.Fatal(err)
100+
}
101+
}

pkg/epp/metrics/metrics_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
RunningRequestsMetric = InferenceModelComponent + "_running_requests"
4141
KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
4242
QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
43+
PerPodQueueSizeMetrics = InferencePoolComponent + "_per_pod_queue_size"
4344
)
4445

4546
func TestRecordRequestCounterandSizes(t *testing.T) {

site-src/guides/metrics.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{
3434
| inference_model_running_requests | Gauge | Number of running requests for each model. | `model_name`=&lt;model-name&gt; | ALPHA |
3535
| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
3636
| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=&lt;inference-pool-name&gt; | ALPHA |
37+
| inference_pool_per_pod_queue_size | Gauge | The total number of queue for each model server pod under the inference pool | `model_server_pod`=&lt;model-server-pod-name&gt; `name`=&lt;inference-pool-name&gt; | ALPHA |
3738
| inference_pool_ready_pods | Gauge | The number of ready pods for an inference server pool. | `name`=&lt;inference-pool-name&gt; | ALPHA |
3839
| inference_extension_info | Gauge | The general information of the current build. | `commit`=&lt;hash-of-the-build&gt; | ALPHA |
3940

0 commit comments

Comments
 (0)