diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 8bf672579..a9165e8fd 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -7,6 +7,7 @@ import ( "time" "go.uber.org/multierr" + "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -58,7 +59,7 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { return nil, false } -func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { +func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { p.refreshPodsOnce() if err := p.refreshMetricsOnce(); err != nil { @@ -85,6 +86,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio } }() + // Periodically flush prometheus metrics for inference pool + go func() { + for { + time.Sleep(refreshPrometheusMetricsInterval) + p.flushPrometheusMetricsOnce() + } + }() + // Periodically print out the pods and metrics for DEBUGGING. if klog.V(logutil.DEBUG).Enabled() { go func() { @@ -174,3 +183,30 @@ func (p *Provider) refreshMetricsOnce() error { } return errs } + +func (p *Provider) flushPrometheusMetricsOnce() { + klog.V(logutil.DEBUG).Infof("Flushing Prometheus Metrics") + + pool, _ := p.datastore.getInferencePool() + if pool == nil { + // No inference pool or not initialize. + return + } + + var kvCacheTotal float64 + var queueTotal int + + podMetrics := p.AllPodMetrics() + if len(podMetrics) == 0 { + return + } + + for _, pod := range podMetrics { + kvCacheTotal += pod.KVCacheUsagePercent + queueTotal += pod.WaitingQueueSize + } + + podTotalCount := len(podMetrics) + metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount)) +} diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index ad231f575..ddd7f0d66 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -90,7 +90,7 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { p := NewProvider(test.pmc, test.datastore) - err := p.Init(time.Millisecond, time.Millisecond) + err := p.Init(time.Millisecond, time.Millisecond, time.Millisecond) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) } diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index a783aa2c5..e126b6dd8 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -69,6 +69,10 @@ var ( "refreshMetricsInterval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics") + refreshPrometheusMetricsInterval = flag.Duration( + "refreshPrometheusMetricsInterval", + runserver.DefaultRefreshPrometheusMetricsInterval, + "interval to flush prometheus metrics") scheme = runtime.NewScheme() ) @@ -102,17 +106,18 @@ func main() { datastore := backend.NewK8sDataStore() serverRunner := &runserver.ExtProcServerRunner{ - GrpcPort: *grpcPort, - TargetEndpointKey: *targetEndpointKey, - PoolName: *poolName, - PoolNamespace: *poolNamespace, - ServiceName: *serviceName, - Zone: *zone, - RefreshPodsInterval: *refreshPodsInterval, - RefreshMetricsInterval: *refreshMetricsInterval, - Scheme: scheme, - Config: ctrl.GetConfigOrDie(), - Datastore: datastore, + GrpcPort: *grpcPort, + TargetEndpointKey: *targetEndpointKey, + PoolName: *poolName, + PoolNamespace: *poolNamespace, + ServiceName: *serviceName, + Zone: *zone, + RefreshPodsInterval: *refreshPodsInterval, + RefreshMetricsInterval: *refreshMetricsInterval, + RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, + Scheme: scheme, + Config: ctrl.GetConfigOrDie(), + Datastore: datastore, } serverRunner.Setup() diff --git a/pkg/ext-proc/metrics/README.md b/pkg/ext-proc/metrics/README.md index 1094bc23d..8adfd94e9 100644 --- a/pkg/ext-proc/metrics/README.md +++ b/pkg/ext-proc/metrics/README.md @@ -46,6 +46,8 @@ spec: | inference_model_response_sizes | Distribution | Distribution of response size in bytes. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_input_tokens | Distribution | Distribution of input token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | | inference_model_output_tokens | Distribution | Distribution of output token count. | `model_name`=<model-name>
`target_model_name`=<target-model-name> | ALPHA | +| inference_pool_average_kv_cache_utilization | Gauge | The average kv cache utilization for an inference server pool. | `name`=<inference-pool-name> | ALPHA | +| inference_pool_average_queue_size | Gauge | The average number of requests pending in the model server queue. | `name`=<inference-pool-name> | ALPHA | ## Scrape Metrics diff --git a/pkg/ext-proc/metrics/metrics.go b/pkg/ext-proc/metrics/metrics.go index 8cb7bd274..7bdc8436e 100644 --- a/pkg/ext-proc/metrics/metrics.go +++ b/pkg/ext-proc/metrics/metrics.go @@ -11,9 +11,11 @@ import ( const ( InferenceModelComponent = "inference_model" + InferencePoolComponent = "inference_pool" ) var ( + // Inference Model Metrics requestCounter = compbasemetrics.NewCounterVec( &compbasemetrics.CounterOpts{ Subsystem: InferenceModelComponent, @@ -88,6 +90,27 @@ var ( }, []string{"model_name", "target_model_name"}, ) + + // Inference Pool Metrics + inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_kv_cache_utilization", + Help: "The average kv cache utilization for an inference server pool.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"name"}, + ) + + inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec( + &compbasemetrics.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_queue_size", + Help: "The average number of requests pending in the model server queue.", + StabilityLevel: compbasemetrics.ALPHA, + }, + []string{"name"}, + ) ) var registerMetrics sync.Once @@ -101,6 +124,9 @@ func Register() { legacyregistry.MustRegister(responseSizes) legacyregistry.MustRegister(inputTokens) legacyregistry.MustRegister(outputTokens) + + legacyregistry.MustRegister(inferencePoolAvgKVCache) + legacyregistry.MustRegister(inferencePoolAvgQueueSize) }) } @@ -143,3 +169,11 @@ func RecordOutputTokens(modelName, targetModelName string, size int) { outputTokens.WithLabelValues(modelName, targetModelName).Observe(float64(size)) } } + +func RecordInferencePoolAvgKVCache(name string, utilization float64) { + inferencePoolAvgKVCache.WithLabelValues(name).Set(utilization) +} + +func RecordInferencePoolAvgQueueSize(name string, queueSize float64) { + inferencePoolAvgQueueSize.WithLabelValues(name).Set(queueSize) +} diff --git a/pkg/ext-proc/metrics/metrics_test.go b/pkg/ext-proc/metrics/metrics_test.go index 57774b11a..348f707e9 100644 --- a/pkg/ext-proc/metrics/metrics_test.go +++ b/pkg/ext-proc/metrics/metrics_test.go @@ -15,6 +15,8 @@ const RequestSizesMetric = InferenceModelComponent + "_request_sizes" const ResponseSizesMetric = InferenceModelComponent + "_response_sizes" const InputTokensMetric = InferenceModelComponent + "_input_tokens" const OutputTokensMetric = InferenceModelComponent + "_output_tokens" +const KVCacheAvgUsageMetric = InferencePoolComponent + "_average_kv_cache_utilization" +const QueueAvgSizeMetric = InferencePoolComponent + "_average_queue_size" func TestRecordRequestCounterandSizes(t *testing.T) { type requests struct { @@ -257,3 +259,53 @@ func TestRecordResponseMetrics(t *testing.T) { }) } } + +func TestInferencePoolMetrics(t *testing.T) { + scenarios := []struct { + name string + poolName string + kvCacheAvg float64 + queueSizeAvg float64 + }{ + { + name: "basic test", + poolName: "p1", + kvCacheAvg: 0.3, + queueSizeAvg: 0.4, + }, + } + Register() + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + + RecordInferencePoolAvgKVCache(scenario.poolName, scenario.kvCacheAvg) + RecordInferencePoolAvgQueueSize(scenario.poolName, scenario.queueSizeAvg) + + wantKVCache, err := os.Open("testdata/kv_cache_avg_metrics") + defer func() { + if err := wantKVCache.Close(); err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil { + t.Error(err) + } + + wantQueueSize, err := os.Open("testdata/queue_avg_size_metrics") + defer func() { + if err := wantQueueSize.Close(); err != nil { + t.Error(err) + } + }() + if err != nil { + t.Fatal(err) + } + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil { + t.Error(err) + } + }) + } +} diff --git a/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics b/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics new file mode 100644 index 000000000..99d1a93af --- /dev/null +++ b/pkg/ext-proc/metrics/testdata/kv_cache_avg_metrics @@ -0,0 +1,3 @@ +# HELP inference_pool_average_kv_cache_utilization [ALPHA] The average kv cache utilization for an inference server pool. +# TYPE inference_pool_average_kv_cache_utilization gauge +inference_pool_average_kv_cache_utilization{name="p1"} 0.3 diff --git a/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics b/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics new file mode 100644 index 000000000..3605740c7 --- /dev/null +++ b/pkg/ext-proc/metrics/testdata/queue_avg_size_metrics @@ -0,0 +1,3 @@ +# HELP inference_pool_average_queue_size [ALPHA] The average number of requests pending in the model server queue. +# TYPE inference_pool_average_queue_size gauge +inference_pool_average_queue_size{name="p1"} 0.4 diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 1c9c1b2e2..bf666f1f4 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -19,42 +19,45 @@ import ( // ExtProcServerRunner provides methods to manage an external process server. type ExtProcServerRunner struct { - GrpcPort int - TargetEndpointKey string - PoolName string - PoolNamespace string - ServiceName string - Zone string - RefreshPodsInterval time.Duration - RefreshMetricsInterval time.Duration - Scheme *runtime.Scheme - Config *rest.Config - Datastore *backend.K8sDatastore - manager ctrl.Manager + GrpcPort int + TargetEndpointKey string + PoolName string + PoolNamespace string + ServiceName string + Zone string + RefreshPodsInterval time.Duration + RefreshMetricsInterval time.Duration + RefreshPrometheusMetricsInterval time.Duration + Scheme *runtime.Scheme + Config *rest.Config + Datastore *backend.K8sDatastore + manager ctrl.Manager } // Default values for CLI flags in main const ( - DefaultGrpcPort = 9002 // default for --grpcPort - DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey - DefaultPoolName = "" // required but no default - DefaultPoolNamespace = "default" // default for --poolNamespace - DefaultServiceName = "" // required but no default - DefaultZone = "" // default for --zone - DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval - DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultGrpcPort = 9002 // default for --grpcPort + DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --poolNamespace + DefaultServiceName = "" // required but no default + DefaultZone = "" // default for --zone + DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval ) func NewDefaultExtProcServerRunner() *ExtProcServerRunner { return &ExtProcServerRunner{ - GrpcPort: DefaultGrpcPort, - TargetEndpointKey: DefaultTargetEndpointKey, - PoolName: DefaultPoolName, - PoolNamespace: DefaultPoolNamespace, - ServiceName: DefaultServiceName, - Zone: DefaultZone, - RefreshPodsInterval: DefaultRefreshPodsInterval, - RefreshMetricsInterval: DefaultRefreshMetricsInterval, + GrpcPort: DefaultGrpcPort, + TargetEndpointKey: DefaultTargetEndpointKey, + PoolName: DefaultPoolName, + PoolNamespace: DefaultPoolNamespace, + ServiceName: DefaultServiceName, + Zone: DefaultZone, + RefreshPodsInterval: DefaultRefreshPodsInterval, + RefreshMetricsInterval: DefaultRefreshMetricsInterval, + RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, // Scheme, Config, and Datastore can be assigned later. } } @@ -123,7 +126,7 @@ func (r *ExtProcServerRunner) Start( // Initialize backend provider pp := backend.NewProvider(podMetricsClient, podDatastore) - if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil { + if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil { klog.Fatalf("Failed to initialize backend provider: %v", err) } diff --git a/pkg/ext-proc/test/benchmark/benchmark.go b/pkg/ext-proc/test/benchmark/benchmark.go index 9ff61d8b6..abaeedbbb 100644 --- a/pkg/ext-proc/test/benchmark/benchmark.go +++ b/pkg/ext-proc/test/benchmark/benchmark.go @@ -21,11 +21,12 @@ var ( svrAddr = flag.String("server_address", fmt.Sprintf("localhost:%d", runserver.DefaultGrpcPort), "Address of the ext proc server") totalRequests = flag.Int("total_requests", 100000, "number of requests to be sent for load test") // Flags when running a local ext proc server. - numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server") - numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server") - localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") - refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") - refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") + numFakePods = flag.Int("num_fake_pods", 200, "number of fake pods when running a local ext proc server") + numModelsPerPod = flag.Int("num_models_per_pod", 5, "number of fake models per pod when running a local ext proc server") + localServer = flag.Bool("local_server", true, "whether to start a local ext proc server") + refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") + refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics via polling pods") + refreshPrometheusMetricsInterval = flag.Duration("refreshPrometheusMetricsInterval", 5*time.Second, "interval to flush prometheus metrics") ) const ( @@ -37,7 +38,7 @@ func main() { flag.Parse() if *localServer { - test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, fakePods(), fakeModels()) + test.StartExtProc(port, *refreshPodsInterval, *refreshMetricsInterval, *refreshPrometheusMetricsInterval, fakePods(), fakeModels()) time.Sleep(time.Second) // wait until server is up klog.Info("Server started") } diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index 63972849e..98793b955 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -16,7 +16,7 @@ import ( klog "k8s.io/klog/v2" ) -func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { +func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server { ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) for _, pod := range pods { @@ -25,7 +25,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur } pmc := &backend.FakePodMetricsClient{Res: pms} pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) - if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { + if err := pp.Init(refreshPodsInterval, refreshMetricsInterval, refreshPrometheusMetricsInterval); err != nil { klog.Fatalf("failed to initialize: %v", err) } return startExtProc(port, pp, models)