diff --git a/cmd/bbr/main.go b/cmd/bbr/main.go index 84b1fffac..0dffa74d5 100644 --- a/cmd/bbr/main.go +++ b/cmd/bbr/main.go @@ -18,26 +18,23 @@ package main import ( "flag" - "net" - "net/http" + "fmt" "os" - "strconv" "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus/promhttp" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/client-go/rest" - "k8s.io/component-base/metrics/legacyregistry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" + "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/server" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -85,7 +82,18 @@ func run() error { return err } - mgr, err := ctrl.NewManager(cfg, ctrl.Options{}) + metrics.Register() + + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: fmt.Sprintf(":%d", *metricsPort), + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + mgr, err := ctrl.NewManager(cfg, ctrl.Options{Metrics: metricsServerOptions}) if err != nil { setupLog.Error(err, "Failed to create manager", "config", cfg) return err @@ -107,11 +115,6 @@ func run() error { return err } - // Register metrics handler. - if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil { - return err - } - // Start the manager. This blocks until a signal is received. setupLog.Info("Manager starting") if err := mgr.Start(ctx); err != nil { @@ -152,58 +155,3 @@ func initLogging(opts *zap.Options) { logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller())) ctrl.SetLogger(logger) } - -const metricsEndpoint = "/metrics" - -// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager. -func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error { - metrics.Register() - - // Init HTTP server. - h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg) - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle(metricsEndpoint, h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - setupLog.Error(err, "Failed to register metrics HTTP handler") - return err - } - return nil -} - -func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) { - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - httpClient, err := rest.HTTPClientFor(cfg) - if err != nil { - setupLog.Error(err, "Failed to create http client for metrics auth") - return nil, err - } - - filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) - if err != nil { - setupLog.Error(err, "Failed to create metrics filter for auth") - return nil, err - } - metricsLogger := ctrl.Log.WithName("metrics").WithValues("path", metricsEndpoint) - metricsAuthHandler, err := filter(metricsLogger, h) - if err != nil { - setupLog.Error(err, "Failed to create metrics auth handler") - return nil, err - } - return metricsAuthHandler, nil -} diff --git a/cmd/epp/main.go b/cmd/epp/main.go index 9c023f26d..ddd3d6256 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -19,25 +19,22 @@ package main import ( "flag" "fmt" - "net" - "net/http" "os" - "strconv" "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/prometheus/client_golang/prometheus" uberzap "go.uber.org/zap" "go.uber.org/zap/zapcore" "google.golang.org/grpc" healthPb "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/rest" - "k8s.io/component-base/metrics/legacyregistry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" @@ -54,10 +51,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -const ( - defaultMetricsEndpoint = "/metrics" -) - var ( grpcPort = flag.Int( "grpcPort", @@ -164,16 +157,6 @@ func run() error { return err } - poolNamespacedName := types.NamespacedName{ - Name: *poolName, - Namespace: *poolNamespace, - } - mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg) - if err != nil { - setupLog.Error(err, "Failed to create controller manager") - return err - } - // Set up mapper for metric scraping. mapping, err := backendmetrics.NewMetricMapping( *totalQueuedRequestsMetric, @@ -192,6 +175,29 @@ func run() error { datastore := datastore.NewDatastore(ctx, pmf) + customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)} + metrics.Register(customCollectors...) + metrics.RecordInferenceExtensionInfo() + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: fmt.Sprintf(":%d", *metricsPort), + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + + poolNamespacedName := types.NamespacedName{ + Name: *poolName, + Namespace: *poolNamespace, + } + mgr, err := runserver.NewDefaultManager(poolNamespacedName, cfg, metricsServerOptions) + if err != nil { + setupLog.Error(err, "Failed to create controller manager") + return err + } + scheduler := scheduling.NewScheduler(datastore) if schedulerV2 == "true" { queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog) @@ -242,11 +248,6 @@ func run() error { return err } - // Register metrics handler. - if err := registerMetricsHandler(mgr, *metricsPort, cfg, datastore); err != nil { - return err - } - // Start the manager. This blocks until a signal is received. setupLog.Info("Controller manager starting") if err := mgr.Start(ctx); err != nil { @@ -290,62 +291,6 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore. return nil } -// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager. -func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config, ds datastore.Datastore) error { - metrics.Register() - legacyregistry.CustomMustRegister(collectors.NewInferencePoolMetricsCollector(ds)) - - metrics.RecordInferenceExtensionInfo() - - // Init HTTP server. - h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg) - if err != nil { - return err - } - - mux := http.NewServeMux() - mux.Handle(defaultMetricsEndpoint, h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - setupLog.Error(err, "Failed to register metrics HTTP handler") - return err - } - return nil -} - -func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) { - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - httpClient, err := rest.HTTPClientFor(cfg) - if err != nil { - setupLog.Error(err, "Failed to create http client for metrics auth") - return nil, err - } - - filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) - if err != nil { - setupLog.Error(err, "Failed to create metrics filter for auth") - return nil, err - } - metricsLogger := ctrl.Log.WithName("metrics").WithValues("path", defaultMetricsEndpoint) - metricsAuthHandler, err := filter(metricsLogger, h) - if err != nil { - setupLog.Error(err, "Failed to create metrics auth handler") - return nil, err - } - return metricsAuthHandler, nil -} - func validateFlags() error { if *poolName == "" { return fmt.Errorf("required %q flag not set", "poolName") diff --git a/pkg/bbr/handlers/request_test.go b/pkg/bbr/handlers/request_test.go index 55c42a218..3bc0d6fe4 100644 --- a/pkg/bbr/handlers/request_test.go +++ b/pkg/bbr/handlers/request_test.go @@ -26,8 +26,8 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/google/go-cmp/cmp" "google.golang.org/protobuf/testing/protocmp" - "k8s.io/component-base/metrics/legacyregistry" metricsutils "k8s.io/component-base/metrics/testutil" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/bbr/metrics" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -204,7 +204,7 @@ func TestHandleRequestBody(t *testing.T) { bbr_success_total{} 1 ` - if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(wantMetrics), "inference_model_request_total"); err != nil { + if err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(wantMetrics), "inference_model_request_total"); err != nil { t.Error(err) } } diff --git a/pkg/bbr/metrics/metrics.go b/pkg/bbr/metrics/metrics.go index fc3538fba..4aec0e16d 100644 --- a/pkg/bbr/metrics/metrics.go +++ b/pkg/bbr/metrics/metrics.go @@ -19,49 +19,48 @@ package metrics import ( "sync" + "github.com/prometheus/client_golang/prometheus" compbasemetrics "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" + "sigs.k8s.io/controller-runtime/pkg/metrics" + + metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics" ) const component = "bbr" var ( - successCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "success_total", - Help: "Count of successes pulling model name from body and injecting it in the request headers.", - StabilityLevel: compbasemetrics.ALPHA, + successCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "success_total", + Help: metricsutil.HelpMsgWithStability("Count of successes pulling model name from body and injecting it in the request headers.", compbasemetrics.ALPHA), }, []string{}, ) - modelNotInBodyCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "model_not_in_body_total", - Help: "Count of times the model was not present in the request body.", - StabilityLevel: compbasemetrics.ALPHA, + modelNotInBodyCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "model_not_in_body_total", + Help: metricsutil.HelpMsgWithStability("Count of times the model was not present in the request body.", compbasemetrics.ALPHA), }, []string{}, ) - modelNotParsedCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: component, - Name: "model_not_parsed_total", - Help: "Count of times the model was in the request body but we could not parse it.", - StabilityLevel: compbasemetrics.ALPHA, + modelNotParsedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: component, + Name: "model_not_parsed_total", + Help: metricsutil.HelpMsgWithStability("Count of times the model was in the request body but we could not parse it.", compbasemetrics.ALPHA), }, []string{}, ) // TODO: Uncomment and use this metrics once the core server implementation has handling to skip body parsing if header exists. /* - modelAlreadyPresentInHeaderCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ + modelAlreadyPresentInHeaderCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ Subsystem: component, Name: "model_already_present_in_header_total", Help: "Count of times the model was already present in request headers.", - StabilityLevel: compbasemetrics.ALPHA, }, []string{}, ) @@ -73,10 +72,10 @@ var registerMetrics sync.Once // Register all metrics. func Register() { registerMetrics.Do(func() { - legacyregistry.MustRegister(successCounter) - legacyregistry.MustRegister(modelNotInBodyCounter) - legacyregistry.MustRegister(modelNotParsedCounter) - // legacyregistry.MustRegister(modelAlreadyPresentInHeaderCounter) + metrics.Registry.MustRegister(successCounter) + metrics.Registry.MustRegister(modelNotInBodyCounter) + metrics.Registry.MustRegister(modelNotParsedCounter) + // metrics.Registry.MustRegister(modelAlreadyPresentInHeaderCounter) }) } diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index f916be514..2be3c1957 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -17,47 +17,46 @@ limitations under the License. package collectors import ( - "k8s.io/component-base/metrics" + "github.com/prometheus/client_golang/prometheus" + compbasemetrics "k8s.io/component-base/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics" ) var ( - descInferencePoolPerPodQueueSize = metrics.NewDesc( + descInferencePoolPerPodQueueSize = prometheus.NewDesc( "inference_pool_per_pod_queue_size", - "The total number of requests pending in the model server queue for each underlying pod.", + metricsutil.HelpMsgWithStability("The total number of requests pending in the model server queue for each underlying pod.", compbasemetrics.ALPHA), []string{ "name", "model_server_pod", }, nil, - metrics.ALPHA, - "", ) ) type inferencePoolMetricsCollector struct { - metrics.BaseStableCollector - ds datastore.Datastore } // Check if inferencePoolMetricsCollector implements necessary interface -var _ metrics.StableCollector = &inferencePoolMetricsCollector{} +var _ prometheus.Collector = &inferencePoolMetricsCollector{} -// NewInferencePoolMetricsCollector implements the metrics.StableCollector interface and +// NewInferencePoolMetricsCollector implements the prometheus.Collector interface and // exposes metrics about inference pool. -func NewInferencePoolMetricsCollector(ds datastore.Datastore) metrics.StableCollector { +func NewInferencePoolMetricsCollector(ds datastore.Datastore) prometheus.Collector { return &inferencePoolMetricsCollector{ ds: ds, } } -// DescribeWithStability implements the metrics.StableCollector interface. -func (c *inferencePoolMetricsCollector) DescribeWithStability(ch chan<- *metrics.Desc) { +// DescribeWithStability implements the prometheus.Collector interface. +func (c *inferencePoolMetricsCollector) Describe(ch chan<- *prometheus.Desc) { ch <- descInferencePoolPerPodQueueSize } -// CollectWithStability implements the metrics.StableCollector interface. -func (c *inferencePoolMetricsCollector) CollectWithStability(ch chan<- metrics.Metric) { +// CollectWithStability implements the prometheus.Collector interface. +func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { pool, err := c.ds.PoolGet() if err != nil { return @@ -69,9 +68,9 @@ func (c *inferencePoolMetricsCollector) CollectWithStability(ch chan<- metrics.M } for _, pod := range podMetrics { - ch <- metrics.NewLazyConstMetric( + ch <- prometheus.MustNewConstMetric( descInferencePoolPerPodQueueSize, - metrics.GaugeValue, + prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), pool.Name, pod.GetPod().NamespacedName.Name, diff --git a/pkg/epp/metrics/collectors/inference_pool_test.go b/pkg/epp/metrics/collectors/inference_pool_test.go index b7ddf019d..d97377ee7 100644 --- a/pkg/epp/metrics/collectors/inference_pool_test.go +++ b/pkg/epp/metrics/collectors/inference_pool_test.go @@ -55,7 +55,7 @@ func TestNoMetricsCollected(t *testing.T) { ds: datastore, } - if err := testutil.CustomCollectAndCompare(collector, strings.NewReader(""), ""); err != nil { + if err := testutil.CollectAndCompare(collector, strings.NewReader(""), ""); err != nil { t.Fatal(err) } } @@ -90,7 +90,7 @@ func TestMetricsCollected(t *testing.T) { collector := &inferencePoolMetricsCollector{ ds: ds, } - err := testutil.CustomCollectAndCompare(collector, strings.NewReader(` + err := testutil.CollectAndCompare(collector, strings.NewReader(` # HELP inference_pool_per_pod_queue_size [ALPHA] The total number of requests pending in the model server queue for each underlying pod. # TYPE inference_pool_per_pod_queue_size gauge inference_pool_per_pod_queue_size{model_server_pod="pod1",name="test-pool"} 100 diff --git a/pkg/epp/metrics/metrics.go b/pkg/epp/metrics/metrics.go index a0d521400..c77e0f05c 100644 --- a/pkg/epp/metrics/metrics.go +++ b/pkg/epp/metrics/metrics.go @@ -21,10 +21,13 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" compbasemetrics "k8s.io/component-base/metrics" - "k8s.io/component-base/metrics/legacyregistry" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/metrics" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + metricsutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/metrics" ) const ( @@ -43,216 +46,199 @@ var ( var ( // Inference Model Metrics - requestCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: InferenceModelComponent, - Name: "request_total", - Help: "Counter of inference model requests broken out for each model and target model.", - StabilityLevel: compbasemetrics.ALPHA, + requestCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceModelComponent, + Name: "request_total", + Help: metricsutil.HelpMsgWithStability("Counter of inference model requests broken out for each model and target model.", compbasemetrics.ALPHA), }, []string{"model_name", "target_model_name"}, ) - requestErrCounter = compbasemetrics.NewCounterVec( - &compbasemetrics.CounterOpts{ - Subsystem: InferenceModelComponent, - Name: "request_error_total", - Help: "Counter of inference model requests errors broken out for each model and target model.", - StabilityLevel: compbasemetrics.ALPHA, + requestErrCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: InferenceModelComponent, + Name: "request_error_total", + Help: metricsutil.HelpMsgWithStability("Counter of inference model requests errors broken out for each model and target model.", compbasemetrics.ALPHA), }, []string{"model_name", "target_model_name", "error_code"}, ) - requestLatencies = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + requestLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "request_duration_seconds", - Help: "Inference model response latency distribution in seconds for each model and target model.", + Help: metricsutil.HelpMsgWithStability("Inference model response latency distribution in seconds for each model and target model.", compbasemetrics.ALPHA), Buckets: []float64{ 0.005, 0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.25, 1.5, 2, 3, 4, 5, 6, 8, 10, 15, 20, 30, 45, 60, 120, 180, 240, 300, 360, 480, 600, 900, 1200, 1800, 2700, 3600, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) - requestSizes = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + requestSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "request_sizes", - Help: "Inference model requests size distribution in bytes for each model and target model.", + Help: metricsutil.HelpMsgWithStability("Inference model requests size distribution in bytes for each model and target model.", compbasemetrics.ALPHA), // Use buckets ranging from 1000 bytes (1KB) to 10^9 bytes (1GB). Buckets: []float64{ 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, // More fine-grained up to 64KB 131072, 262144, 524288, 1048576, 2097152, 4194304, 8388608, // Exponential up to 8MB 16777216, 33554432, 67108864, 134217728, 268435456, 536870912, 1073741824, // Exponential up to 1GB }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) - responseSizes = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + responseSizes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "response_sizes", - Help: "Inference model responses size distribution in bytes for each model and target model.", + Help: metricsutil.HelpMsgWithStability("Inference model responses size distribution in bytes for each model and target model.", compbasemetrics.ALPHA), // Most models have a response token < 8192 tokens. Each token, in average, has 4 characters. // 8192 * 4 = 32768. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536}, }, []string{"model_name", "target_model_name"}, ) - inputTokens = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + inputTokens = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "input_tokens", - Help: "Inference model input token count distribution for requests in each model.", + Help: metricsutil.HelpMsgWithStability("Inference model input token count distribution for requests in each model.", compbasemetrics.ALPHA), // Most models have a input context window less than 1 million tokens. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32778, 65536, 131072, 262144, 524288, 1048576}, }, []string{"model_name", "target_model_name"}, ) - outputTokens = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + outputTokens = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "output_tokens", - Help: "Inference model output token count distribution for requests in each model.", + Help: metricsutil.HelpMsgWithStability("Inference model output token count distribution for requests in each model.", compbasemetrics.ALPHA), // Most models generates output less than 8192 tokens. - Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{1, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}, }, []string{"model_name", "target_model_name"}, ) - runningRequests = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferenceModelComponent, - Name: "running_requests", - Help: "Inference model number of running requests in each model.", - StabilityLevel: compbasemetrics.ALPHA, + runningRequests = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceModelComponent, + Name: "running_requests", + Help: metricsutil.HelpMsgWithStability("Inference model number of running requests in each model.", compbasemetrics.ALPHA), }, []string{"model_name"}, ) // NTPOT - Normalized Time Per Output Token - NormalizedTimePerOutputToken = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + NormalizedTimePerOutputToken = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceModelComponent, Name: "normalized_time_per_output_token_seconds", - Help: "Inference model latency divided by number of output tokens in seconds for each model and target model.", + Help: metricsutil.HelpMsgWithStability("Inference model latency divided by number of output tokens in seconds for each model and target model.", compbasemetrics.ALPHA), // From few milliseconds per token to multiple seconds per token Buckets: []float64{ 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"model_name", "target_model_name"}, ) // Inference Pool Metrics - inferencePoolAvgKVCache = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "average_kv_cache_utilization", - Help: "The average kv cache utilization for an inference server pool.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolAvgKVCache = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_kv_cache_utilization", + Help: metricsutil.HelpMsgWithStability("The average kv cache utilization for an inference server pool.", compbasemetrics.ALPHA), }, []string{"name"}, ) - inferencePoolAvgQueueSize = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "average_queue_size", - Help: "The average number of requests pending in the model server queue.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolAvgQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "average_queue_size", + Help: metricsutil.HelpMsgWithStability("The average number of requests pending in the model server queue.", compbasemetrics.ALPHA), }, []string{"name"}, ) - inferencePoolReadyPods = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferencePoolComponent, - Name: "ready_pods", - Help: "The number of ready pods in the inference server pool.", - StabilityLevel: compbasemetrics.ALPHA, + inferencePoolReadyPods = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferencePoolComponent, + Name: "ready_pods", + Help: metricsutil.HelpMsgWithStability("The number of ready pods in the inference server pool.", compbasemetrics.ALPHA), }, []string{"name"}, ) // Scheduler Metrics - SchedulerE2ELatency = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + SchedulerE2ELatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceExtension, Name: "scheduler_e2e_duration_seconds", - Help: "End-to-end scheduling latency distribution in seconds.", + Help: metricsutil.HelpMsgWithStability("End-to-end scheduling latency distribution in seconds.", compbasemetrics.ALPHA), Buckets: []float64{ 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, }, - StabilityLevel: compbasemetrics.ALPHA, + // StabilityLevel: prometheus.ALPHA, }, []string{}, ) - SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + SchedulerPluginProcessingLatencies = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceExtension, Name: "scheduler_plugin_duration_seconds", - Help: "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.", + Help: metricsutil.HelpMsgWithStability("Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.", compbasemetrics.ALPHA), Buckets: []float64{ 0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, }, - StabilityLevel: compbasemetrics.ALPHA, }, []string{"plugin_type", "plugin_name"}, ) // Prefix indexer Metrics - PrefixCacheSize = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferenceExtension, - Name: "prefix_indexer_size", - Help: "Size of the prefix indexer.", - StabilityLevel: compbasemetrics.ALPHA, + PrefixCacheSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceExtension, + Name: "prefix_indexer_size", + Help: metricsutil.HelpMsgWithStability("Size of the prefix indexer.", compbasemetrics.ALPHA), }, []string{}, ) - PrefixCacheHitRatio = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ + PrefixCacheHitRatio = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Subsystem: InferenceExtension, Name: "prefix_indexer_hit_ratio", - Help: "Ratio of prefix length matched to total prefix length in the cache lookup.", + Help: metricsutil.HelpMsgWithStability("Ratio of prefix length matched to total prefix length in the cache lookup.", compbasemetrics.ALPHA), // Buckets from 0.0 to 1.0 in increments - Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, - StabilityLevel: compbasemetrics.ALPHA, + Buckets: []float64{0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0}, }, []string{}, ) - PrefixCacheHitLength = compbasemetrics.NewHistogramVec( - &compbasemetrics.HistogramOpts{ - Subsystem: InferenceExtension, - Name: "prefix_indexer_hit_bytes", - Help: "Length of the prefix match in number of bytes in the cache lookup.", - Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}, - StabilityLevel: compbasemetrics.ALPHA, + PrefixCacheHitLength = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: InferenceExtension, + Name: "prefix_indexer_hit_bytes", + Help: metricsutil.HelpMsgWithStability("Length of the prefix match in number of bytes in the cache lookup.", compbasemetrics.ALPHA), + Buckets: []float64{0, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536}, }, []string{}, ) // Info Metrics - InferenceExtensionInfo = compbasemetrics.NewGaugeVec( - &compbasemetrics.GaugeOpts{ - Subsystem: InferenceExtension, - Name: "info", - Help: "General information of the current build of Inference Extension.", - StabilityLevel: compbasemetrics.ALPHA, + InferenceExtensionInfo = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: InferenceExtension, + Name: "info", + Help: metricsutil.HelpMsgWithStability("General information of the current build of Inference Extension.", compbasemetrics.ALPHA), }, []string{"commit", "build_ref"}, ) @@ -261,33 +247,54 @@ var ( var registerMetrics sync.Once // Register all metrics. -func Register() { +func Register(customCollectors ...prometheus.Collector) { registerMetrics.Do(func() { - legacyregistry.MustRegister(requestCounter) - legacyregistry.MustRegister(requestErrCounter) - legacyregistry.MustRegister(requestLatencies) - legacyregistry.MustRegister(requestSizes) - legacyregistry.MustRegister(responseSizes) - legacyregistry.MustRegister(inputTokens) - legacyregistry.MustRegister(outputTokens) - legacyregistry.MustRegister(runningRequests) - legacyregistry.MustRegister(NormalizedTimePerOutputToken) - - legacyregistry.MustRegister(inferencePoolAvgKVCache) - legacyregistry.MustRegister(inferencePoolAvgQueueSize) - legacyregistry.MustRegister(inferencePoolReadyPods) - - legacyregistry.MustRegister(SchedulerPluginProcessingLatencies) - legacyregistry.MustRegister(SchedulerE2ELatency) - - legacyregistry.MustRegister(InferenceExtensionInfo) - - legacyregistry.MustRegister(PrefixCacheSize) - legacyregistry.MustRegister(PrefixCacheHitRatio) - legacyregistry.MustRegister(PrefixCacheHitLength) + metrics.Registry.MustRegister(requestCounter) + metrics.Registry.MustRegister(requestErrCounter) + metrics.Registry.MustRegister(requestLatencies) + metrics.Registry.MustRegister(requestSizes) + metrics.Registry.MustRegister(responseSizes) + metrics.Registry.MustRegister(inputTokens) + metrics.Registry.MustRegister(outputTokens) + metrics.Registry.MustRegister(runningRequests) + metrics.Registry.MustRegister(NormalizedTimePerOutputToken) + metrics.Registry.MustRegister(inferencePoolAvgKVCache) + metrics.Registry.MustRegister(inferencePoolAvgQueueSize) + metrics.Registry.MustRegister(inferencePoolReadyPods) + metrics.Registry.MustRegister(SchedulerPluginProcessingLatencies) + metrics.Registry.MustRegister(SchedulerE2ELatency) + metrics.Registry.MustRegister(InferenceExtensionInfo) + metrics.Registry.MustRegister(PrefixCacheSize) + metrics.Registry.MustRegister(PrefixCacheHitRatio) + metrics.Registry.MustRegister(PrefixCacheHitLength) + for _, collector := range customCollectors { + metrics.Registry.MustRegister(collector) + } }) } +// Just for integration test +func Reset() { + requestCounter.Reset() + requestErrCounter.Reset() + requestLatencies.Reset() + requestSizes.Reset() + responseSizes.Reset() + inputTokens.Reset() + outputTokens.Reset() + runningRequests.Reset() + NormalizedTimePerOutputToken.Reset() + inferencePoolAvgKVCache.Reset() + inferencePoolAvgQueueSize.Reset() + inferencePoolReadyPods.Reset() + SchedulerPluginProcessingLatencies.Reset() + SchedulerE2ELatency.Reset() + InferenceExtensionInfo.Reset() + PrefixCacheSize.Reset() + PrefixCacheHitRatio.Reset() + PrefixCacheHitLength.Reset() +} + // RecordRequstCounter records the number of requests. func RecordRequestCounter(modelName, targetModelName string) { requestCounter.WithLabelValues(modelName, targetModelName).Inc() diff --git a/pkg/epp/metrics/metrics_test.go b/pkg/epp/metrics/metrics_test.go index 4ad6f96e1..8cee042eb 100644 --- a/pkg/epp/metrics/metrics_test.go +++ b/pkg/epp/metrics/metrics_test.go @@ -22,8 +22,8 @@ import ( "testing" "time" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + "sigs.k8s.io/controller-runtime/pkg/metrics" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) @@ -93,7 +93,7 @@ func TestRecordRequestCounterandSizes(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestTotal, RequestTotalMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestTotal, RequestTotalMetric); err != nil { t.Error(err) } wantRequestSizes, err := os.Open("testdata/request_sizes_metric") @@ -105,7 +105,7 @@ func TestRecordRequestCounterandSizes(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestSizes, RequestSizesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestSizes, RequestSizesMetric); err != nil { t.Error(err) } }) @@ -165,7 +165,7 @@ func TestRecordRequestErrorCounter(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestErrorCounter, RequestErrorTotalMetric); err != nil { t.Error(err) } }) @@ -247,7 +247,7 @@ func TestRecordRequestLatencies(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRequestLatencies, RequestLatenciesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRequestLatencies, RequestLatenciesMetric); err != nil { t.Error(err) } }) @@ -348,7 +348,7 @@ func TestRecordNormalizedTimePerOutputToken(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantLatencyPerToken, NormalizedTimePerOutputTokenMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantLatencyPerToken, NormalizedTimePerOutputTokenMetric); err != nil { t.Error(err) } }) @@ -416,7 +416,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantResponseSize, ResponseSizesMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantResponseSize, ResponseSizesMetric); err != nil { t.Error(err) } @@ -429,7 +429,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantInputToken, InputTokensMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantInputToken, InputTokensMetric); err != nil { t.Error(err) } @@ -442,7 +442,7 @@ func TestRecordResponseMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantOutputToken, OutputTokensMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantOutputToken, OutputTokensMetric); err != nil { t.Error(err) } }) @@ -502,7 +502,7 @@ func TestRunningRequestsMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantRunningRequests, RunningRequestsMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantRunningRequests, RunningRequestsMetric); err != nil { t.Error(err) } }) @@ -538,7 +538,7 @@ func TestInferencePoolMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantKVCache, KVCacheAvgUsageMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantKVCache, KVCacheAvgUsageMetric); err != nil { t.Error(err) } @@ -551,7 +551,7 @@ func TestInferencePoolMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantQueueSize, QueueAvgSizeMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantQueueSize, QueueAvgSizeMetric); err != nil { t.Error(err) } }) @@ -615,7 +615,7 @@ func TestSchedulerPluginProcessingLatencies(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantPluginLatencies, "inference_extension_scheduler_plugin_duration_seconds"); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantPluginLatencies, "inference_extension_scheduler_plugin_duration_seconds"); err != nil { t.Error(err) } }) @@ -658,7 +658,7 @@ func TestSchedulerE2ELatency(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantE2ELatency, "inference_extension_scheduler_e2e_duration_seconds"); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantE2ELatency, "inference_extension_scheduler_e2e_duration_seconds"); err != nil { t.Error(err) } }) @@ -734,7 +734,7 @@ func TestPrefixCacheMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantCacheSizeMetrics, PrefixCacheSizeMetric); err != nil { t.Error(err) } @@ -748,7 +748,7 @@ func TestPrefixCacheMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantHitRatioMetrics, PrefixCacheHitRatioMetric); err != nil { t.Error(err) } @@ -762,7 +762,7 @@ func TestPrefixCacheMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil { + if err := testutil.GatherAndCompare(metrics.Registry, wantHitLengthMetrics, PrefixCacheHitLengthMetric); err != nil { t.Error(err) } }) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index e56682104..89e509696 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -30,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" ) @@ -41,7 +42,7 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(namespacedName types.NamespacedName) ctrl.Options { +func defaultManagerOptions(namespacedName types.NamespacedName, metricsServerOptions metricsserver.Options) ctrl.Options { return ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -67,12 +68,13 @@ func defaultManagerOptions(namespacedName types.NamespacedName) ctrl.Options { }, }, }, + Metrics: metricsServerOptions, } } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config) (ctrl.Manager, error) { - manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName)) +func NewDefaultManager(namespacedName types.NamespacedName, restConfig *rest.Config, metricsServerOptions metricsserver.Options) (ctrl.Manager, error) { + manager, err := ctrl.NewManager(restConfig, defaultManagerOptions(namespacedName, metricsServerOptions)) if err != nil { return nil, fmt.Errorf("failed to create controller manager: %v", err) } diff --git a/pkg/epp/util/metrics/metrics.go b/pkg/epp/util/metrics/metrics.go new file mode 100644 index 000000000..167669435 --- /dev/null +++ b/pkg/epp/util/metrics/metrics.go @@ -0,0 +1,28 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "fmt" + + compbasemetrics "k8s.io/component-base/metrics" +) + +// HelpMsgWithStability is a helper function to create a help message with stability level. +func HelpMsgWithStability(msg string, stability compbasemetrics.StabilityLevel) string { + return fmt.Sprintf("[%v] %v", stability, msg) +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 3035a9b57..3f94f9d33 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -24,8 +24,6 @@ import ( "errors" "fmt" "io" - "net" - "net/http" "os" "path/filepath" "strconv" @@ -37,7 +35,6 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3" "github.com/google/go-cmp/cmp" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -51,20 +48,22 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" k8syaml "k8s.io/apimachinery/pkg/util/yaml" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/component-base/metrics/legacyregistry" metricsutils "k8s.io/component-base/metrics/testutil" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/envtest" - "sigs.k8s.io/controller-runtime/pkg/manager" + crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" + "sigs.k8s.io/controller-runtime/pkg/metrics/filters" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" epptestutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" @@ -1279,13 +1278,12 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) { if len(test.wantMetrics) != 0 { for metricName, value := range test.wantMetrics { - if err := metricsutils.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(value), metricName); err != nil { + if err := metricsutils.GatherAndCompare(crmetrics.Registry, strings.NewReader(value), metricName); err != nil { t.Error(err) } } } - - legacyregistry.Reset() + metrics.Reset() }) } } @@ -1401,15 +1399,21 @@ func BeforeSuite() func() { // Init runtime. ctrl.SetLogger(logger) - mgr, err := runserver.NewManagerWithOptions(cfg, managerTestOptions("default", "vllm-llama3-8b-instruct-pool")) + metrics.Register() + // Register metrics handler. + // Metrics endpoint is enabled in 'config/default/kustomization.yaml'. The Metrics options configure the server. + // More info: + // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/metrics/server + // - https://book.kubebuilder.io/reference/metrics.html + metricsServerOptions := metricsserver.Options{ + BindAddress: fmt.Sprintf(":%d", metricsPort), + FilterProvider: filters.WithAuthenticationAndAuthorization, + } + mgr, err := server.NewManagerWithOptions(cfg, managerTestOptions("default", "vllm-llama3-8b-instruct-pool", metricsServerOptions)) if err != nil { logutil.Fatal(logger, err, "Failed to create controller manager") } - if err := registerMetricsHandler(mgr, metricsPort); err != nil { - logutil.Fatal(logger, err, "Failed to register metrics handler") - } - serverRunner = runserver.NewDefaultExtProcServerRunner() serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) @@ -1506,37 +1510,9 @@ func makeMetadata(endpoint string) *structpb.Struct { } } -// registerMetricsHandler is a simplified version of metrics endpoint handler -// without Authentication for integration tests. -func registerMetricsHandler(mgr manager.Manager, port int) error { - metrics.Register() - - // Init HTTP server. - h := promhttp.HandlerFor( - legacyregistry.DefaultGatherer, - promhttp.HandlerOpts{}, - ) - - mux := http.NewServeMux() - mux.Handle("/metrics", h) - - srv := &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - - if err := mgr.Add(&manager.Server{ - Name: "metrics", - Server: srv, - }); err != nil { - return err - } - return nil -} - // inject options that allow multiple test runs to run // https://github.com/kubernetes-sigs/controller-runtime/issues/2937 -func managerTestOptions(namespace, name string) ctrl.Options { +func managerTestOptions(namespace, name string, metricsServerOptions metricsserver.Options) ctrl.Options { return ctrl.Options{ Scheme: scheme, Cache: cache.Options{ @@ -1565,6 +1541,7 @@ func managerTestOptions(namespace, name string) ctrl.Options { Controller: config.Controller{ SkipNameValidation: boolPointer(true), }, + Metrics: metricsServerOptions, } }