diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go
index 8bf672579..a9165e8fd 100644
--- a/pkg/ext-proc/backend/provider.go
+++ b/pkg/ext-proc/backend/provider.go
@@ -7,6 +7,7 @@ import (
"time"
"go.uber.org/multierr"
+ "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)
@@ -58,7 +59,7 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
return nil, false
}
-func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
+func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
p.refreshPodsOnce()
if err := p.refreshMetricsOnce(); err != nil {
@@ -85,6 +86,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
}
}()
+ // Periodically flush prometheus metrics for inference pool
+ go func() {
+ for {
+ time.Sleep(refreshPrometheusMetricsInterval)
+ p.flushPrometheusMetricsOnce()
+ }
+ }()
+
// Periodically print out the pods and metrics for DEBUGGING.
if klog.V(logutil.DEBUG).Enabled() {
go func() {
@@ -174,3 +183,30 @@ func (p *Provider) refreshMetricsOnce() error {
}
return errs
}
+
+func (p *Provider) flushPrometheusMetricsOnce() {
+ klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics")
+
+ pool, _ := p.datastore.getInferencePool()
+ if pool == nil {
+ // No inference pool or not initialize.
+ return
+ }
+
+ var kvCacheTotal float64
+ var queueTotal int
+
+ podMetrics := p.AllPodMetrics()
+ if len(podMetrics) == 0 {
+ return
+ }
+
+ for _, pod := range podMetrics {
+ kvCacheTotal += pod.KVCacheUsagePercent
+ queueTotal += pod.WaitingQueueSize
+ }
+
+ podTotalCount := len(podMetrics)
+ metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
+ metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
+}
diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go
index ad231f575..ddd7f0d66 100644
--- a/pkg/ext-proc/backend/provider_test.go
+++ b/pkg/ext-proc/backend/provider_test.go
@@ -90,7 +90,7 @@ func TestProvider(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := NewProvider(test.pmc, test.datastore)
- err := p.Init(time.Millisecond, time.Millisecond)
+ err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond)
if test.initErr != (err != nil) {
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
}
diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go
index a783aa2c5..e126b6dd8 100644
--- a/pkg/ext-proc/main.go
+++ b/pkg/ext-proc/main.go
@@ -69,6 +69,10 @@ var (
"refreshMetricsInterval",
runserver.DefaultRefreshMetricsInterval,
"interval to refresh metrics")
+ refreshPrometheusMetricsInterval = flag.Duration(
+ "refreshPrometheusMetricsInterval",
+ runserver.DefaultRefreshPrometheusMetricsInterval,
+ "interval to flush prometheus metrics")
scheme = runtime.NewScheme()
)
@@ -102,17 +106,18 @@ func main() {
datastore := backend.NewK8sDataStore()
serverRunner := &runserver.ExtProcServerRunner{
- GrpcPort: *grpcPort,
- TargetEndpointKey: *targetEndpointKey,
- PoolName: *poolName,
- PoolNamespace: *poolNamespace,
- ServiceName: *serviceName,
- Zone: *zone,
- RefreshPodsInterval: *refreshPodsInterval,
- RefreshMetricsInterval: *refreshMetricsInterval,
- Scheme: scheme,
- Config: ctrl.GetConfigOrDie(),
- Datastore: datastore,
+ GrpcPort: *grpcPort,
+ TargetEndpointKey: *targetEndpointKey,
+ PoolName: *poolName,
+ PoolNamespace: *poolNamespace,
+ ServiceName: *serviceName,
+ Zone: *zone,
+ RefreshPodsInterval: *refreshPodsInterval,
+ RefreshMetricsInterval: *refreshMetricsInterval,
+ RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
+ Scheme: scheme,
+ Config: ctrl.GetConfigOrDie(),
+ Datastore: datastore,
}
serverRunner.Setup()
diff --git a/pkg/ext-proc/metrics/README.md b/pkg/ext-proc/metrics/README.md
index 1094bc23d..8adfd94e9 100644
--- a/pkg/ext-proc/metrics/README.md
+++ b/pkg/ext-proc/metrics/README.md
@@ -46,6 +46,8 @@ spec:
| inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
| inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA |
+| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=<inference-pool-name> | ALPHA |
+| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=<inference-pool-name> | ALPHA |
## Scrape Metrics
diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go
index 8cb7bd274..7bdc8436e 100644
--- a/pkg/ext-proc/metrics/metrics.go
+++ b/pkg/ext-proc/metrics/metrics.go
@@ -11,9 +11,11 @@ import (
const (
InferenceModelComponent = "inference_model"
+ InferencePoolComponent = "inference_pool"
)
var (
+ // Inference Model Metrics
requestCounter = compbasemetrics.NewCounterVec(
&compbasemetrics.CounterOpts{
Subsystem: InferenceModelComponent,
@@ -88,6 +90,27 @@ var (
},
[]string{"model_name", "target_model_name"},
)
+
+ // Inference Pool Metrics
+ inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec(
+ &compbasemetrics.GaugeOpts{
+ Subsystem: InferencePoolComponent,
+ Name: "average_kv_cache_utilization",
+ Help: "The average kv cache utilization for an inference server pool.",
+ StabilityLevel: compbasemetrics.ALPHA,
+ },
+ []string{"name"},
+ )
+
+ inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec(
+ &compbasemetrics.GaugeOpts{
+ Subsystem: InferencePoolComponent,
+ Name: "average_queue_size",
+ Help: "The average number of requests pending in the model server queue.",
+ StabilityLevel: compbasemetrics.ALPHA,
+ },
+ []string{"name"},
+ )
)
var registerMetrics sync.Once
@@ -101,6 +124,9 @@ func Register() {
legacyregistry.MustRegister(responseSizes)
legacyregistry.MustRegister(inputTokens)
legacyregistry.MustRegister(outputTokens)
+
+ legacyregistry.MustRegister(inferencePoolAvgKVCache)
+ legacyregistry.MustRegister(inferencePoolAvgQueueSize)
})
}
@@ -143,3 +169,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) {
outputTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size))
}
}
+
+func RecordInferencePoolAvgKVCache(name string, utilization float64) {
+ inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization)
+}
+
+func RecordInferencePoolAvgQueueSize(name string, queueSize float64) {
+ inferencePoolAvgQueueSize.WithLabelValues(name).Set(queueSize)
+}
diff --git a/pkg/ext-proc/metrics/metrics_test.go b/pkg/ext-proc/metrics/metrics_test.go
index 57774b11a..348f707e9 100644
--- a/pkg/ext-proc/metrics/metrics_test.go
+++ b/pkg/ext-proc/metrics/metrics_test.go
@@ -15,6 +15,8 @@ const RequestSizesMetric = InferenceModelComponent + "_request_sizes"
const ResponseSizesMetric = InferenceModelComponent + "_response_sizes"
const InputTokensMetric = InferenceModelComponent + "_input_tokens"
const OutputTokensMetric = InferenceModelComponent + "_output_tokens"
+const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization"
+const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size"
func TestRecordRequestCounterandSizes(t *testing.T) {
type requests struct {
@@ -257,3 +259,53 @@ func TestRecordResponseMetrics(t *testing.T) {
})
}
}
+
+func TestInferencePoolMetrics(t *testing.T) {
+ scenarios := []struct {
+ name string
+ poolName string
+ kvCacheAvg float64
+ queueSizeAvg float64
+ }{
+ {
+ name: "basic test",
+ poolName: "p1",
+ kvCacheAvg: 0.3,
+ queueSizeAvg: 0.4,
+ },
+ }
+ Register()
+ for _, scenario := range scenarios {
+ t.Run(scenario.name, func(t *testing.T) {
+
+ RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg)
+ RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg)
+
+ wantKVCache, err := os.Open("testdata/kv_cache_avg_metrics")
+ defer func() {
+ if err := wantKVCache.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil {
+ t.Error(err)
+ }
+
+ wantQueueSize, err := os.Open("testdata/queue_avg_size_metrics")
+ defer func() {
+ if err := wantQueueSize.Close(); err != nil {
+ t.Error(err)
+ }
+ }()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil {
+ t.Error(err)
+ }
+ })
+ }
+}
diff --git a/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics b/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics
new file mode 100644
index 000000000..99d1a93af
--- /dev/null
+++ b/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics
@@ -0,0 +1,3 @@
+# HELP inference_pool_average_kv_cache_utilization [ALPHA] The average kv cache utilization for an inference server pool.
+# TYPE inference_pool_average_kv_cache_utilization gauge
+inference_pool_average_kv_cache_utilization{name="p1"} 0.3
diff --git a/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics b/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics
new file mode 100644
index 000000000..3605740c7
--- /dev/null
+++ b/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics
@@ -0,0 +1,3 @@
+# HELP inference_pool_average_queue_size [ALPHA] The average number of requests pending in the model server queue.
+# TYPE inference_pool_average_queue_size gauge
+inference_pool_average_queue_size{name="p1"} 0.4
diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go
index 1c9c1b2e2..bf666f1f4 100644
--- a/pkg/ext-proc/server/runserver.go
+++ b/pkg/ext-proc/server/runserver.go
@@ -19,42 +19,45 @@ import (
// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
- GrpcPort int
- TargetEndpointKey string
- PoolName string
- PoolNamespace string
- ServiceName string
- Zone string
- RefreshPodsInterval time.Duration
- RefreshMetricsInterval time.Duration
- Scheme *runtime.Scheme
- Config *rest.Config
- Datastore *backend.K8sDatastore
- manager ctrl.Manager
+ GrpcPort int
+ TargetEndpointKey string
+ PoolName string
+ PoolNamespace string
+ ServiceName string
+ Zone string
+ RefreshPodsInterval time.Duration
+ RefreshMetricsInterval time.Duration
+ RefreshPrometheusMetricsInterval time.Duration
+ Scheme *runtime.Scheme
+ Config *rest.Config
+ Datastore *backend.K8sDatastore
+ manager ctrl.Manager
}
// Default values for CLI flags in main
const (
- DefaultGrpcPort = 9002 // default for --grpcPort
- DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
- DefaultPoolName = "" // required but no default
- DefaultPoolNamespace = "default" // default for --poolNamespace
- DefaultServiceName = "" // required but no default
- DefaultZone = "" // default for --zone
- DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
- DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
+ DefaultGrpcPort = 9002 // default for --grpcPort
+ DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
+ DefaultPoolName = "" // required but no default
+ DefaultPoolNamespace = "default" // default for --poolNamespace
+ DefaultServiceName = "" // required but no default
+ DefaultZone = "" // default for --zone
+ DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
+ DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
+ DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
)
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
- GrpcPort: DefaultGrpcPort,
- TargetEndpointKey: DefaultTargetEndpointKey,
- PoolName: DefaultPoolName,
- PoolNamespace: DefaultPoolNamespace,
- ServiceName: DefaultServiceName,
- Zone: DefaultZone,
- RefreshPodsInterval: DefaultRefreshPodsInterval,
- RefreshMetricsInterval: DefaultRefreshMetricsInterval,
+ GrpcPort: DefaultGrpcPort,
+ TargetEndpointKey: DefaultTargetEndpointKey,
+ PoolName: DefaultPoolName,
+ PoolNamespace: DefaultPoolNamespace,
+ ServiceName: DefaultServiceName,
+ Zone: DefaultZone,
+ RefreshPodsInterval: DefaultRefreshPodsInterval,
+ RefreshMetricsInterval: DefaultRefreshMetricsInterval,
+ RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
// Scheme, Config, and Datastore can be assigned later.
}
}
@@ -123,7 +126,7 @@ func (r *ExtProcServerRunner) Start(
// Initialize backend provider
pp := backend.NewProvider(podMetricsClient, podDatastore)
- if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
+ if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil {
klog.Fatalf("Failed to initialize backend provider: %v", err)
}
diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go
index 9ff61d8b6..abaeedbbb 100644
--- a/pkg/ext-proc/test/benchmark/benchmark.go
+++ b/pkg/ext-proc/test/benchmark/benchmark.go
@@ -21,11 +21,12 @@ var (
svrAddr = flag.String("server_address", fmt.Sprintf("localhost:%d", runserver.DefaultGrpcPort), "Address of the ext proc server")
totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test")
// Flags when running a local ext proc server.
- numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
- numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
- localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
- refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
- refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics")
+ numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server")
+ numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server")
+ localServer = flag.Bool("local_server", true, "whether to start a local ext proc server")
+ refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods")
+ refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics via polling pods")
+ refreshPrometheusMetricsInterval = flag.Duration("refreshPrometheusMetricsInterval", 5*time.Second, "interval to flush prometheus metrics")
)
const (
@@ -37,7 +38,7 @@ func main() {
flag.Parse()
if *localServer {
- test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods(), fakeModels())
+ test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels())
time.Sleep(time.Second) // wait until server is up
klog.Info("Server started")
}
diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go
index 63972849e..98793b955 100644
--- a/pkg/ext-proc/test/utils.go
+++ b/pkg/ext-proc/test/utils.go
@@ -16,7 +16,7 @@ import (
klog "k8s.io/klog/v2"
)
-func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
+func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
ps := make(backend.PodSet)
pms := make(map[backend.Pod]*backend.PodMetrics)
for _, pod := range pods {
@@ -25,7 +25,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur
}
pmc := &backend.FakePodMetricsClient{Res: pms}
pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods)))
- if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
+ if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil {
klog.Fatalf("failed to initialize: %v", err)
}
return startExtProc(port, pp, models)