Skip to content

Commit 5d48994

Browse files
authored
Merge branch 'kubernetes-sigs:main' into main
2 parents 58d2c77 + 9270ff6 commit 5d48994

21 files changed

+1031
-229
lines changed

examples/poc/manifests/vllm/vllm-lora-deployment.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ spec:
7878
timeoutSeconds: 1
7979
resources:
8080
limits:
81-
nvidia.com/gpu: 2
81+
nvidia.com/gpu: 1
8282
requests:
83-
nvidia.com/gpu: 2
83+
nvidia.com/gpu: 1
8484
volumeMounts:
8585
- mountPath: /data
8686
name: data

pkg/README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
Our custom LLM Gateway ext-proc is patched into the existing envoy gateway via `EnvoyPatchPolicy`. To enable this feature, we must extend the Envoy Gateway config map. To do this, simply run:
1212
```bash
13-
kubectl apply -f ./manifests/gateway/enable_patch_policy.yaml
13+
kubectl apply -f ./manifests/enable_patch_policy.yaml
1414
kubectl rollout restart deployment envoy-gateway -n envoy-gateway-system
1515

1616
```
@@ -20,14 +20,14 @@
2020
1. **Deploy Gateway**
2121

2222
```bash
23-
kubectl apply -f ./manifests/gateway/gateway.yaml
23+
kubectl apply -f ./manifests/gateway.yaml
2424
```
2525

2626
1. **Deploy Ext-Proc**
2727

2828
```bash
29-
kubectl apply -f ./manifests/gateway/ext_proc.yaml
30-
kubectl apply -f ./manifests/gateway/patch_policy.yaml
29+
kubectl apply -f ./manifests/ext_proc.yaml
30+
kubectl apply -f ./manifests/patch_policy.yaml
3131
```
3232
**NOTE**: Ensure the `instance-gateway-ext-proc` deployment is updated with the pod names and internal IP addresses of the vLLM replicas. This step is crucial for the correct routing of requests based on headers. This won't be needed once we make ext proc dynamically read the pods.
3333

pkg/ext-proc/backend/fake.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package backend
22

3+
import "context"
4+
35
type FakePodLister struct {
46
Err error
57
Pods PodSet
@@ -10,7 +12,7 @@ type FakePodMetricsClient struct {
1012
Res map[Pod]*PodMetrics
1113
}
1214

13-
func (f *FakePodMetricsClient) FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error) {
15+
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) {
1416
if err, ok := f.Err[pod]; ok {
1517
return nil, err
1618
}

pkg/ext-proc/backend/provider.go

+30-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package backend
22

33
import (
4+
"context"
45
"fmt"
56
"sync"
67
"time"
@@ -9,6 +10,10 @@ import (
910
klog "k8s.io/klog/v2"
1011
)
1112

13+
const (
14+
fetchMetricsTimeout = 5 * time.Second
15+
)
16+
1217
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
1318
p := &Provider{
1419
podMetrics: sync.Map{},
@@ -27,7 +32,7 @@ type Provider struct {
2732
}
2833

2934
type PodMetricsClient interface {
30-
FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error)
35+
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
3136
}
3237

3338
type PodLister interface {
@@ -60,7 +65,8 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
6065
if err := p.refreshPodsOnce(); err != nil {
6166
return fmt.Errorf("failed to init pods: %v", err)
6267
}
63-
if err := p.refreshMetricsOnce(); err != nil {
68+
err := p.refreshMetricsOnce()
69+
if err != nil {
6470
return fmt.Errorf("failed to init metrics: %v", err)
6571
}
6672

@@ -113,7 +119,7 @@ func (p *Provider) refreshPodsOnce() error {
113119
new := &PodMetrics{
114120
Pod: pod,
115121
Metrics: Metrics{
116-
CachedModels: make(map[string]int),
122+
ActiveModels: make(map[string]int),
117123
},
118124
}
119125
p.podMetrics.Store(pod, new)
@@ -132,35 +138,48 @@ func (p *Provider) refreshPodsOnce() error {
132138
}
133139

134140
func (p *Provider) refreshMetricsOnce() error {
141+
ctx, cancel := context.WithTimeout(context.Background(), fetchMetricsTimeout)
142+
defer cancel()
135143
start := time.Now()
136144
defer func() {
137145
d := time.Since(start)
138146
// TODO: add a metric instead of logging
139147
klog.V(4).Infof("Refreshed metrics in %v", d)
140148
}()
141149
var wg sync.WaitGroup
142-
var errs error
150+
errCh := make(chan error)
143151
processOnePod := func(key, value any) bool {
144152
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
145153
pod := key.(Pod)
146154
existing := value.(*PodMetrics)
147155
wg.Add(1)
148156
go func() {
149157
defer wg.Done()
150-
updated, err := p.pmc.FetchMetrics(pod, existing)
158+
updated, err := p.pmc.FetchMetrics(ctx, pod, existing)
151159
if err != nil {
152-
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
160+
errCh <- fmt.Errorf("failed to parse metrics from %s: %v", pod, err)
153161
return
154162
}
155-
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
156-
if err != nil {
157-
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
158-
}
159163
p.UpdatePodMetrics(pod, updated)
164+
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
160165
}()
161166
return true
162167
}
163168
p.podMetrics.Range(processOnePod)
164-
wg.Wait()
169+
170+
// Wait for metric collection for all pods to complete and close the error channel in a
171+
// goroutine so this is unblocking, allowing the code to proceed to the error collection code
172+
// below.
173+
// Note we couldn't use a buffered error channel with a size because the size of the podMetrics
174+
// sync.Map is unknown beforehand.
175+
go func() {
176+
wg.Wait()
177+
close(errCh)
178+
}()
179+
180+
var errs error
181+
for err := range errCh {
182+
errs = multierr.Append(errs, err)
183+
}
165184
return errs
166185
}

pkg/ext-proc/backend/provider_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package backend
2+
3+
import (
4+
"errors"
5+
"testing"
6+
"time"
7+
8+
"github.com/google/go-cmp/cmp"
9+
"github.com/google/go-cmp/cmp/cmpopts"
10+
)
11+
12+
var (
13+
pod1 = &PodMetrics{
14+
Pod: Pod{Name: "pod1"},
15+
Metrics: Metrics{
16+
WaitingQueueSize: 0,
17+
KVCacheUsagePercent: 0.2,
18+
MaxActiveModels: 2,
19+
ActiveModels: map[string]int{
20+
"foo": 1,
21+
"bar": 1,
22+
},
23+
},
24+
}
25+
pod2 = &PodMetrics{
26+
Pod: Pod{Name: "pod2"},
27+
Metrics: Metrics{
28+
WaitingQueueSize: 1,
29+
KVCacheUsagePercent: 0.2,
30+
MaxActiveModels: 2,
31+
ActiveModels: map[string]int{
32+
"foo1": 1,
33+
"bar1": 1,
34+
},
35+
},
36+
}
37+
)
38+
39+
func TestProvider(t *testing.T) {
40+
tests := []struct {
41+
name string
42+
pmc PodMetricsClient
43+
pl PodLister
44+
initErr bool
45+
want []*PodMetrics
46+
}{
47+
{
48+
name: "Init success",
49+
pl: &FakePodLister{
50+
Pods: map[Pod]bool{
51+
pod1.Pod: true,
52+
pod2.Pod: true,
53+
},
54+
},
55+
pmc: &FakePodMetricsClient{
56+
Res: map[Pod]*PodMetrics{
57+
pod1.Pod: pod1,
58+
pod2.Pod: pod2,
59+
},
60+
},
61+
want: []*PodMetrics{pod1, pod2},
62+
},
63+
{
64+
name: "Fetch metrics error",
65+
pl: &FakePodLister{
66+
Pods: map[Pod]bool{
67+
pod1.Pod: true,
68+
pod2.Pod: true,
69+
},
70+
},
71+
pmc: &FakePodMetricsClient{
72+
Err: map[Pod]error{
73+
pod2.Pod: errors.New("injected error"),
74+
},
75+
Res: map[Pod]*PodMetrics{
76+
pod1.Pod: pod1,
77+
},
78+
},
79+
initErr: true,
80+
want: []*PodMetrics{
81+
pod1,
82+
// Failed to fetch pod2 metrics so it remains the default values.
83+
&PodMetrics{
84+
Pod: Pod{Name: "pod2"},
85+
Metrics: Metrics{
86+
WaitingQueueSize: 0,
87+
KVCacheUsagePercent: 0,
88+
MaxActiveModels: 0,
89+
ActiveModels: map[string]int{},
90+
},
91+
}},
92+
},
93+
}
94+
95+
for _, test := range tests {
96+
t.Run(test.name, func(t *testing.T) {
97+
p := NewProvider(test.pmc, test.pl)
98+
err := p.Init(time.Millisecond, time.Millisecond)
99+
if test.initErr != (err != nil) {
100+
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
101+
}
102+
metrics := p.AllPodMetrics()
103+
lessFunc := func(a, b *PodMetrics) bool {
104+
return a.String() < b.String()
105+
}
106+
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
107+
t.Errorf("Unexpected output (-want +got): %v", diff)
108+
}
109+
})
110+
}
111+
}

pkg/ext-proc/backend/types.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ type Pod struct {
1212
}
1313

1414
func (p Pod) String() string {
15-
return p.Namespace + "." + p.Name
15+
return p.Namespace + "/" + p.Name
1616
}
1717

1818
type Metrics struct {
19-
// CachedModels is a set of models(including LoRA adapters) that are currently cached to GPU.
20-
CachedModels map[string]int
19+
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
20+
ActiveModels map[string]int
21+
// MaxActiveModels is the maximum number of models that can be loaded to GPU.
22+
MaxActiveModels int
2123
RunningQueueSize int
2224
WaitingQueueSize int
2325
KVCacheUsagePercent float64
@@ -34,14 +36,14 @@ func (pm *PodMetrics) String() string {
3436
}
3537

3638
func (pm *PodMetrics) Clone() *PodMetrics {
37-
cm := make(map[string]int, len(pm.CachedModels))
38-
for k, v := range pm.CachedModels {
39+
cm := make(map[string]int, len(pm.ActiveModels))
40+
for k, v := range pm.ActiveModels {
3941
cm[k] = v
4042
}
4143
clone := &PodMetrics{
4244
Pod: pm.Pod,
4345
Metrics: Metrics{
44-
CachedModels: cm,
46+
ActiveModels: cm,
4547
RunningQueueSize: pm.RunningQueueSize,
4648
WaitingQueueSize: pm.WaitingQueueSize,
4749
KVCacheUsagePercent: pm.KVCacheUsagePercent,

0 commit comments

Comments
 (0)