diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 0cc21a4f9..a10df4e94 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -2,6 +2,7 @@ package backend import ( "context" + "errors" "fmt" "sync" "time" @@ -10,10 +11,6 @@ import ( klog "k8s.io/klog/v2" ) -const ( - fetchMetricsTimeout = 5 * time.Second -) - func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { p := &Provider{ podMetrics: sync.Map{}, @@ -35,10 +32,31 @@ type PodMetricsClient interface { FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) } +func isPodMetricsStale(pm *PodMetrics) bool { + // TODO: make it configurable + return time.Since(pm.UpdatedTime) > 5*time.Second +} + func (p *Provider) AllPodMetrics() []*PodMetrics { + return p.allPodMetrics(false) +} + +func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics { + return p.allPodMetrics(true) +} + +func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics { res := []*PodMetrics{} fn := func(k, v any) bool { - res = append(res, v.(*PodMetrics)) + m := v.(*PodMetrics) + + if !staleIncluded && isPodMetricsStale(m) { + // exclude stale metrics for scheduler + klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod) + return true + } + + res = append(res, m) return true } p.podMetrics.Range(fn) @@ -46,12 +64,14 @@ func (p *Provider) AllPodMetrics() []*PodMetrics { } func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) { + pm.UpdatedTime = time.Now() p.podMetrics.Store(pod, pm) } func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { val, ok := p.podMetrics.Load(pod) if ok { + // For now, we don't exclude stale metrics with GET operation. return val.(*PodMetrics), true } return nil, false @@ -60,11 +80,11 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) { func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error { p.refreshPodsOnce() - if err := p.refreshMetricsOnce(); err != nil { + if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil { klog.Errorf("Failed to init metrics: %v", err) } - klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) + klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetricsIncludingStale()) // periodically refresh pods go func() { @@ -76,10 +96,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio // periodically refresh metrics go func() { + time.Sleep(refreshMetricsInterval) for { - time.Sleep(refreshMetricsInterval) - if err := p.refreshMetricsOnce(); err != nil { - klog.V(4).Infof("Failed to refresh metrics: %v", err) + start := time.Now() + + if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil { + klog.Errorf("Failed to refresh metrics: %v", err) + } + + now := time.Now() + used := now.Sub(start) + if used < refreshMetricsInterval { + time.Sleep(refreshMetricsInterval - used) } } }() @@ -89,7 +117,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio go func() { for { time.Sleep(5 * time.Second) - klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) + klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetricsIncludingStale()) } }() } @@ -127,8 +155,8 @@ func (p *Provider) refreshPodsOnce() { p.datastore.pods.Range(addNewPods) } -func (p *Provider) refreshMetricsOnce() error { - ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout) +func (p *Provider) refreshMetricsOnce(interval time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), interval) defer cancel() start := time.Now() defer func() { @@ -147,7 +175,12 @@ func (p *Provider) refreshMetricsOnce() error { defer wg.Done() updated, err := p.pmc.FetchMetrics(ctx, pod, existing) if err != nil { - errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err) + // handle timeout error as less severe error + if errors.Is(err, context.Canceled) { + klog.V(4).Infof("Timeout fetching metrics for pod %s", pod) + } else { + errCh <- fmt.Errorf("failed to fetch metrics from %s: %v", pod, err) + } return } p.UpdatePodMetrics(pod, updated) diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index ad231f575..d178f3bda 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -39,11 +39,12 @@ var ( func TestProvider(t *testing.T) { tests := []struct { - name string - pmc PodMetricsClient - datastore *K8sDatastore - initErr bool - want []*PodMetrics + name string + pmc PodMetricsClient + datastore *K8sDatastore + initErr bool + want []*PodMetrics + wantIncludingStale []*PodMetrics }{ { name: "Init success", @@ -56,7 +57,8 @@ func TestProvider(t *testing.T) { pod2.Pod: pod2, }, }, - want: []*PodMetrics{pod1, pod2}, + want: []*PodMetrics{pod1, pod2}, + wantIncludingStale: []*PodMetrics{pod1, pod2}, }, { name: "Fetch metrics error", @@ -72,6 +74,11 @@ func TestProvider(t *testing.T) { pods: populateMap(pod1.Pod, pod2.Pod), }, want: []*PodMetrics{ + pod1, + // Failed to fetch pod2 metrics so it remains the default values, + // which is stale. + }, + wantIncludingStale: []*PodMetrics{ pod1, // Failed to fetch pod2 metrics so it remains the default values. { @@ -101,6 +108,15 @@ func TestProvider(t *testing.T) { if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" { t.Errorf("Unexpected output (-want +got): %v", diff) } + + // Then check for AllPodMetricsIncludingStale + if len(test.wantIncludingStale) > 0 { + metricsIncludingStale := p.AllPodMetricsIncludingStale() + if diff := cmp.Diff(test.wantIncludingStale, metricsIncludingStale, cmpopts.SortSlices(lessFunc)); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + + } }) } } diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index 7e399fedc..18ef0c15b 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -1,7 +1,10 @@ // Package backend is a library to interact with backend model servers such as probing metrics. package backend -import "fmt" +import ( + "fmt" + "time" +) type PodSet map[Pod]bool @@ -28,6 +31,9 @@ type Metrics struct { type PodMetrics struct { Pod Metrics + + // UpdatedTime record the time when the metrics are updated. + UpdatedTime time.Time } func (pm *PodMetrics) String() string { @@ -48,6 +54,7 @@ func (pm *PodMetrics) Clone() *PodMetrics { KVCacheUsagePercent: pm.KVCacheUsagePercent, KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity, }, + UpdatedTime: pm.UpdatedTime, } return clone } diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 5fff4d8e6..77ad1f11a 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -49,22 +49,23 @@ func (p *PodMetricsClientImpl) FetchMetrics( } resp, err := http.DefaultClient.Do(req) if err != nil { - klog.Errorf("failed to fetch metrics from %s: %v", pod, err) - return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) + // As we use a short fetch timeout to ensure the metrics are up-to-date, there will be a lot + // of timeout of error even only 0.1% requests are timed out. + // Return the raw error so that the caller can filter out it via errors.Is(err, context.Canceled) + return nil, err } defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) } parser := expfmt.TextParser{} metricFamilies, err := parser.TextToMetricFamilies(resp.Body) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to parse metrics from %s: %w", pod, err) } return promToPodMetrics(metricFamilies, existing) }