diff --git a/pkg/epp/backend/fake.go b/pkg/epp/backend/fake.go index 06f14f696..584486c29 100644 --- a/pkg/epp/backend/fake.go +++ b/pkg/epp/backend/fake.go @@ -31,7 +31,7 @@ type FakePodMetricsClient struct { Res map[types.NamespacedName]*datastore.PodMetrics } -func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) { +func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics, port int32) (*datastore.PodMetrics, error) { if err, ok := f.Err[existing.NamespacedName]; ok { return nil, err } diff --git a/pkg/epp/backend/provider.go b/pkg/epp/backend/provider.go index a12f84d5c..959f3e0c9 100644 --- a/pkg/epp/backend/provider.go +++ b/pkg/epp/backend/provider.go @@ -49,7 +49,7 @@ type Provider struct { } type PodMetricsClient interface { - FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) + FetchMetrics(ctx context.Context, existing *datastore.PodMetrics, port int32) (*datastore.PodMetrics, error) } func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error { @@ -105,6 +105,11 @@ func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshProm func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { loggerTrace := logger.V(logutil.TRACE) + pool, _ := p.datastore.PoolGet() + if pool == nil { + loggerTrace.Info("No inference pool or not initialized") + return nil + } ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) defer cancel() start := time.Now() @@ -113,6 +118,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { // TODO: add a metric instead of logging loggerTrace.Info("Metrics refreshed", "duration", d) }() + var wg sync.WaitGroup errCh := make(chan error) processOnePod := func(key, value any) bool { @@ -121,7 +127,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { wg.Add(1) go func() { defer wg.Done() - updated, err := p.pmc.FetchMetrics(ctx, existing) + updated, err := p.pmc.FetchMetrics(ctx, existing, pool.Spec.TargetPortNumber) if err != nil { errCh <- fmt.Errorf("failed to parse metrics from %s: %v", existing.NamespacedName, err) return @@ -151,8 +157,6 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error { } func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) { - logger.V(logutil.DEBUG).Info("Flushing Prometheus Metrics") - pool, _ := p.datastore.PoolGet() if pool == nil { // No inference pool or not initialize. @@ -163,6 +167,7 @@ func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) { var queueTotal int podMetrics := p.datastore.PodGetAll() + logger.V(logutil.VERBOSE).Info("Flushing Prometheus Metrics", "ReadyPods", len(podMetrics)) if len(podMetrics) == 0 { return } diff --git a/pkg/epp/backend/provider_test.go b/pkg/epp/backend/provider_test.go index f2db09feb..129947238 100644 --- a/pkg/epp/backend/provider_test.go +++ b/pkg/epp/backend/provider_test.go @@ -26,6 +26,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" ) @@ -68,6 +69,12 @@ var ( }, }, } + + inferencePool = &v1alpha2.InferencePool{ + Spec: v1alpha2.InferencePoolSpec{ + TargetPortNumber: 8000, + }, + } ) func TestProvider(t *testing.T) { @@ -127,7 +134,7 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ds := datastore.NewFakeDatastore(test.storePods, nil, nil) + ds := datastore.NewFakeDatastore(test.storePods, nil, inferencePool) p := NewProvider(test.pmc, ds) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/epp/backend/vllm/metrics.go b/pkg/epp/backend/vllm/metrics.go index 8648e24ce..4973c93eb 100644 --- a/pkg/epp/backend/vllm/metrics.go +++ b/pkg/epp/backend/vllm/metrics.go @@ -55,13 +55,15 @@ type PodMetricsClientImpl struct{} func (p *PodMetricsClientImpl) FetchMetrics( ctx context.Context, existing *datastore.PodMetrics, + port int32, ) (*datastore.PodMetrics, error) { logger := log.FromContext(ctx) loggerDefault := logger.V(logutil.DEFAULT) // Currently the metrics endpoint is hard-coded, which works with vLLM. // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/16): Consume this from InferencePool config. - url := existing.BuildScrapeEndpoint() + url := "http://" + existing.Address + ":" + strconv.Itoa(int(port)) + "/metrics" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { loggerDefault.Error(err, "Failed create HTTP request", "method", http.MethodGet, "url", url) diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 575762130..7534ac0f9 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -35,10 +35,10 @@ import ( ) var ( - basePod1 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1", ScrapePath: "/metrics", ScrapePort: 8000}} - basePod2 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2", ScrapePath: "/metrics", ScrapePort: 8000}} - basePod3 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3", ScrapePath: "/metrics", ScrapePort: 8000}} - basePod11 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11", ScrapePath: "/metrics", ScrapePort: 8000}} + basePod1 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1"}} + basePod2 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2"}} + basePod3 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3"}} + basePod11 = &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11"}} ) func TestPodReconciler(t *testing.T) { diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index cd5d290f2..2bde571fc 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -265,16 +265,13 @@ func (ds *datastore) PodDelete(namespacedName types.NamespacedName) { } func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { - pool, _ := ds.PoolGet() new := &PodMetrics{ Pod: Pod{ NamespacedName: types.NamespacedName{ Name: pod.Name, Namespace: pod.Namespace, }, - Address: pod.Status.PodIP, - ScrapePath: "/metrics", - ScrapePort: pool.Spec.TargetPortNumber, + Address: pod.Status.PodIP, }, Metrics: Metrics{ ActiveModels: make(map[string]int), diff --git a/pkg/epp/datastore/types.go b/pkg/epp/datastore/types.go index 237e98ca5..8cfcf1d1f 100644 --- a/pkg/epp/datastore/types.go +++ b/pkg/epp/datastore/types.go @@ -26,10 +26,6 @@ import ( type Pod struct { NamespacedName types.NamespacedName Address string - - // metrics scrape options - ScrapePort int32 - ScrapePath string } type Metrics struct { @@ -61,11 +57,10 @@ func (pm *PodMetrics) Clone() *PodMetrics { Pod: Pod{ NamespacedName: pm.NamespacedName, Address: pm.Address, - ScrapePort: pm.ScrapePort, - ScrapePath: pm.ScrapePath, }, Metrics: Metrics{ ActiveModels: cm, + MaxActiveModels: pm.MaxActiveModels, RunningQueueSize: pm.RunningQueueSize, WaitingQueueSize: pm.WaitingQueueSize, KVCacheUsagePercent: pm.KVCacheUsagePercent, @@ -74,7 +69,3 @@ func (pm *PodMetrics) Clone() *PodMetrics { } return clone } - -func (pm *PodMetrics) BuildScrapeEndpoint() string { - return fmt.Sprintf("http://%s:%d%s", pm.Address, pm.ScrapePort, pm.ScrapePath) -}