Skip to content

Commit d3f5765

Browse files
committed
ensure metrics refresh time <= refreshMetricsInterval
Signed-off-by: spacewander <[email protected]>
1 parent fa40dc5 commit d3f5765

File tree

4 files changed

+82
-25
lines changed

4 files changed

+82
-25
lines changed

pkg/ext-proc/backend/provider.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backend
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78
"time"
@@ -10,10 +11,6 @@ import (
1011
klog "k8s.io/klog/v2"
1112
)
1213

13-
const (
14-
fetchMetricsTimeout = 5 * time.Second
15-
)
16-
1714
func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
1815
p := &Provider{
1916
podMetrics: sync.Map{},
@@ -35,23 +32,46 @@ type PodMetricsClient interface {
3532
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
3633
}
3734

35+
func isPodMetricsStale(pm *PodMetrics) bool {
36+
// TODO: make it configurable
37+
return time.Since(pm.UpdatedTime) > 5*time.Second
38+
}
39+
3840
func (p *Provider) AllPodMetrics() []*PodMetrics {
41+
return p.allPodMetrics(false)
42+
}
43+
44+
func (p *Provider) AllPodMetricsIncludingStale() []*PodMetrics {
45+
return p.allPodMetrics(true)
46+
}
47+
48+
func (p *Provider) allPodMetrics(staleIncluded bool) []*PodMetrics {
3949
res := []*PodMetrics{}
4050
fn := func(k, v any) bool {
41-
res = append(res, v.(*PodMetrics))
51+
m := v.(*PodMetrics)
52+
53+
if !staleIncluded && isPodMetricsStale(m) {
54+
// exclude stale metrics for scheduler
55+
klog.V(4).Infof("Pod metrics for %s is stale, skipping", m.Pod)
56+
return true
57+
}
58+
59+
res = append(res, m)
4260
return true
4361
}
4462
p.podMetrics.Range(fn)
4563
return res
4664
}
4765

4866
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
67+
pm.UpdatedTime = time.Now()
4968
p.podMetrics.Store(pod, pm)
5069
}
5170

5271
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
5372
val, ok := p.podMetrics.Load(pod)
5473
if ok {
74+
// For now, we don't exclude stale metrics with GET operation.
5575
return val.(*PodMetrics), true
5676
}
5777
return nil, false
@@ -60,11 +80,11 @@ func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
6080
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
6181
p.refreshPodsOnce()
6282

63-
if err := p.refreshMetricsOnce(); err != nil {
83+
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
6484
klog.Errorf("Failed to init metrics: %v", err)
6585
}
6686

67-
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
87+
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetricsIncludingStale())
6888

6989
// periodically refresh pods
7090
go func() {
@@ -76,10 +96,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
7696

7797
// periodically refresh metrics
7898
go func() {
99+
time.Sleep(refreshMetricsInterval)
79100
for {
80-
time.Sleep(refreshMetricsInterval)
81-
if err := p.refreshMetricsOnce(); err != nil {
82-
klog.V(4).Infof("Failed to refresh metrics: %v", err)
101+
start := time.Now()
102+
103+
if err := p.refreshMetricsOnce(refreshMetricsInterval); err != nil {
104+
klog.Errorf("Failed to refresh metrics: %v", err)
105+
}
106+
107+
now := time.Now()
108+
used := now.Sub(start)
109+
if used < refreshMetricsInterval {
110+
time.Sleep(refreshMetricsInterval - used)
83111
}
84112
}
85113
}()
@@ -89,7 +117,7 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
89117
go func() {
90118
for {
91119
time.Sleep(5 * time.Second)
92-
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
120+
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetricsIncludingStale())
93121
}
94122
}()
95123
}
@@ -127,8 +155,8 @@ func (p *Provider) refreshPodsOnce() {
127155
p.datastore.pods.Range(addNewPods)
128156
}
129157

130-
func (p *Provider) refreshMetricsOnce() error {
131-
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
158+
func (p *Provider) refreshMetricsOnce(interval time.Duration) error {
159+
ctx, cancel := context.WithTimeout(context.Background(), interval)
132160
defer cancel()
133161
start := time.Now()
134162
defer func() {
@@ -147,7 +175,12 @@ func (p *Provider) refreshMetricsOnce() error {
147175
defer wg.Done()
148176
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
149177
if err != nil {
150-
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
178+
// handle timeout error as less severe error
179+
if errors.Is(err, context.Canceled) {
180+
klog.V(4).Infof("Timeout fetching metrics for pod %s", pod)
181+
} else {
182+
errCh <- fmt.Errorf("failed to fetch metrics from %s: %v", pod, err)
183+
}
151184
return
152185
}
153186
p.UpdatePodMetrics(pod, updated)

pkg/ext-proc/backend/provider_test.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,12 @@ var (
3939

4040
func TestProvider(t *testing.T) {
4141
tests := []struct {
42-
name string
43-
pmc PodMetricsClient
44-
datastore *K8sDatastore
45-
initErr bool
46-
want []*PodMetrics
42+
name string
43+
pmc PodMetricsClient
44+
datastore *K8sDatastore
45+
initErr bool
46+
want []*PodMetrics
47+
wantIncludingStale []*PodMetrics
4748
}{
4849
{
4950
name: "Init success",
@@ -56,7 +57,8 @@ func TestProvider(t *testing.T) {
5657
pod2.Pod: pod2,
5758
},
5859
},
59-
want: []*PodMetrics{pod1, pod2},
60+
want: []*PodMetrics{pod1, pod2},
61+
wantIncludingStale: []*PodMetrics{pod1, pod2},
6062
},
6163
{
6264
name: "Fetch metrics error",
@@ -72,6 +74,11 @@ func TestProvider(t *testing.T) {
7274
pods: populateMap(pod1.Pod, pod2.Pod),
7375
},
7476
want: []*PodMetrics{
77+
pod1,
78+
// Failed to fetch pod2 metrics so it remains the default values,
79+
// which is stale.
80+
},
81+
wantIncludingStale: []*PodMetrics{
7582
pod1,
7683
// Failed to fetch pod2 metrics so it remains the default values.
7784
{
@@ -101,6 +108,15 @@ func TestProvider(t *testing.T) {
101108
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
102109
t.Errorf("Unexpected output (-want +got): %v", diff)
103110
}
111+
112+
// Then check for AllPodMetricsIncludingStale
113+
if len(test.wantIncludingStale) > 0 {
114+
metricsIncludingStale := p.AllPodMetricsIncludingStale()
115+
if diff := cmp.Diff(test.wantIncludingStale, metricsIncludingStale, cmpopts.SortSlices(lessFunc)); diff != "" {
116+
t.Errorf("Unexpected output (-want +got): %v", diff)
117+
}
118+
119+
}
104120
})
105121
}
106122
}

pkg/ext-proc/backend/types.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Package backend is a library to interact with backend model servers such as probing metrics.
22
package backend
33

4-
import "fmt"
4+
import (
5+
"fmt"
6+
"time"
7+
)
58

69
type PodSet map[Pod]bool
710

@@ -28,6 +31,9 @@ type Metrics struct {
2831
type PodMetrics struct {
2932
Pod
3033
Metrics
34+
35+
// UpdatedTime record the time when the metrics are updated.
36+
UpdatedTime time.Time
3137
}
3238

3339
func (pm *PodMetrics) String() string {
@@ -48,6 +54,7 @@ func (pm *PodMetrics) Clone() *PodMetrics {
4854
KVCacheUsagePercent: pm.KVCacheUsagePercent,
4955
KvCacheMaxTokenCapacity: pm.KvCacheMaxTokenCapacity,
5056
},
57+
UpdatedTime: pm.UpdatedTime,
5158
}
5259
return clone
5360
}

pkg/ext-proc/backend/vllm/metrics.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,22 +49,23 @@ func (p *PodMetricsClientImpl) FetchMetrics(
4949
}
5050
resp, err := http.DefaultClient.Do(req)
5151
if err != nil {
52-
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
53-
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
52+
// As we use a short fetch timeout to ensure the metrics are up-to-date, there will be a lot
53+
// of timeout of error even only 0.1% requests are timed out.
54+
// Return the raw error so that the caller can filter out it via errors.Is(err, context.Canceled)
55+
return nil, err
5456
}
5557
defer func() {
5658
_ = resp.Body.Close()
5759
}()
5860

5961
if resp.StatusCode != http.StatusOK {
60-
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
6162
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
6263
}
6364

6465
parser := expfmt.TextParser{}
6566
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
6667
if err != nil {
67-
return nil, err
68+
return nil, fmt.Errorf("failed to parse metrics from %s: %w", pod, err)
6869
}
6970
return promToPodMetrics(metricFamilies, existing)
7071
}

0 commit comments

Comments
 (0)