Skip to content

Commit 3fa65ae

Browse files
committed
Refactor: Define PodMetricsClient interface and hide implementation details of vllm metrics processing
1 parent fcad109 commit 3fa65ae

File tree

6 files changed

+78
-149
lines changed

6 files changed

+78
-149
lines changed

pkg/ext-proc/backend/fake.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
package backend
22

3-
import (
4-
dto "github.com/prometheus/client_model/go"
5-
)
6-
73
type FakePodLister struct {
84
Err error
95
Pods PodSet
106
}
117

128
type FakePodMetricsClient struct {
139
Err map[Pod]error
14-
Res map[Pod]map[string]*dto.MetricFamily
10+
Res map[Pod]*PodMetrics
1511
}
1612

17-
func (f *FakePodMetricsClient) FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error) {
13+
func (f *FakePodMetricsClient) FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error) {
1814
if err, ok := f.Err[pod]; ok {
1915
return nil, err
2016
}

pkg/ext-proc/backend/pod_client.go

-34
This file was deleted.

pkg/ext-proc/backend/provider.go

+36-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66
"time"
77

8-
dto "github.com/prometheus/client_model/go"
8+
"go.uber.org/multierr"
99
klog "k8s.io/klog/v2"
1010
)
1111

@@ -27,7 +27,7 @@ type Provider struct {
2727
}
2828

2929
type PodMetricsClient interface {
30-
FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error)
30+
FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error)
3131
}
3232

3333
type PodLister interface {
@@ -130,3 +130,37 @@ func (p *Provider) refreshPodsOnce() error {
130130
p.podMetrics.Range(mergeFn)
131131
return nil
132132
}
133+
134+
func (p *Provider) refreshMetricsOnce() error {
135+
start := time.Now()
136+
defer func() {
137+
d := time.Since(start)
138+
// TODO: add a metric instead of logging
139+
klog.V(4).Infof("Refreshed metrics in %v", d)
140+
}()
141+
var wg sync.WaitGroup
142+
var errs error
143+
processOnePod := func(key, value any) bool {
144+
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
145+
pod := key.(Pod)
146+
existing := value.(*PodMetrics)
147+
wg.Add(1)
148+
go func() {
149+
defer wg.Done()
150+
updated, err := p.pmc.FetchMetrics(pod, existing)
151+
if err != nil {
152+
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
153+
return
154+
}
155+
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
156+
if err != nil {
157+
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
158+
}
159+
p.UpdatePodMetrics(pod, updated)
160+
}()
161+
return true
162+
}
163+
p.podMetrics.Range(processOnePod)
164+
wg.Wait()
165+
return errs
166+
}

pkg/ext-proc/backend/metrics.go renamed to pkg/ext-proc/backend/vllm/metrics.go

+31-35
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
1-
package backend
1+
// Package vllm provides vllm specific pod metrics implementation.
2+
package vllm
23

34
import (
5+
"ext-proc/backend"
46
"fmt"
7+
"net/http"
58
"strings"
6-
"sync"
79
"time"
810

911
dto "github.com/prometheus/client_model/go"
12+
"github.com/prometheus/common/expfmt"
1013
"go.uber.org/multierr"
1114
klog "k8s.io/klog/v2"
1215
)
@@ -25,45 +28,38 @@ const (
2528
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
2629
)
2730

28-
func (p *Provider) refreshMetricsOnce() error {
29-
start := time.Now()
30-
defer func() {
31-
d := time.Now().Sub(start)
32-
// TODO: add a metric instead of logging
33-
klog.V(4).Infof("Refreshed metrics in %v", d)
34-
}()
35-
var wg sync.WaitGroup
36-
var errs error
37-
processOnePod := func(key, value any) bool {
38-
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
39-
pod := key.(Pod)
40-
metrics := value.(*PodMetrics)
41-
wg.Add(1)
42-
go func() {
43-
defer wg.Done()
44-
metricFamilies, err := p.pmc.FetchMetrics(pod)
45-
if err != nil {
46-
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
47-
return
48-
}
49-
updated, err := promToPodMetrics(metricFamilies, metrics)
50-
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
51-
if err != nil {
52-
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
53-
}
54-
p.UpdatePodMetrics(pod, updated)
55-
}()
56-
return true
31+
type PodMetricsClientImpl struct {
32+
}
33+
34+
// FetchMetrics fetches metrics from a given pod.
35+
func (p *PodMetricsClientImpl) FetchMetrics(pod backend.Pod, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
36+
// Currently the metrics endpoint is hard-coded, which works with vLLM.
37+
// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16): Consume this from LLMServerPool config.
38+
url := fmt.Sprintf("http://%s/metrics", pod.Address)
39+
resp, err := http.Get(url)
40+
if err != nil {
41+
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
42+
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
43+
}
44+
defer resp.Body.Close()
45+
46+
if resp.StatusCode != http.StatusOK {
47+
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
48+
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
49+
}
50+
51+
parser := expfmt.TextParser{}
52+
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
53+
if err != nil {
54+
return nil, err
5755
}
58-
p.podMetrics.Range(processOnePod)
59-
wg.Wait()
60-
return errs
56+
return promToPodMetrics(metricFamilies, existing)
6157
}
6258

6359
// promToPodMetrics updates internal pod metrics with scraped prometheus metrics.
6460
// A combined error is returned if errors occur in one or more metric processing.
6561
// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map.
66-
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *PodMetrics) (*PodMetrics, error) {
62+
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
6763
var errs error
6864
updated := existing.Clone()
6965
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)

pkg/ext-proc/benchmark/benchmark.go

+7-68
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/bojand/ghz/runner"
1313
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1414
"github.com/jhump/protoreflect/desc"
15-
dto "github.com/prometheus/client_model/go"
1615
"google.golang.org/grpc"
1716
"google.golang.org/grpc/reflection"
1817
"google.golang.org/protobuf/proto"
@@ -117,9 +116,9 @@ func startExtProc() {
117116
s.Serve(lis)
118117
}
119118

120-
func fakePods() (backend.PodSet, map[backend.Pod]map[string]*dto.MetricFamily) {
119+
func fakePods() (backend.PodSet, map[backend.Pod]*backend.PodMetrics) {
121120
pods := make(backend.PodSet)
122-
metrics := make(map[backend.Pod]map[string]*dto.MetricFamily, *numFakePods)
121+
metrics := make(map[backend.Pod]*backend.PodMetrics, *numFakePods)
123122
for i := 0; i < *numFakePods; i++ {
124123
address := fmt.Sprintf("address-%v", i)
125124
pod := backend.Pod{
@@ -135,78 +134,18 @@ func fakePods() (backend.PodSet, map[backend.Pod]map[string]*dto.MetricFamily) {
135134
}
136135

137136
// fakeMetrics adds numModelsPerPod number of adapters to the pod metrics.
138-
func fakeMetrics(podNumber int) map[string]*dto.MetricFamily {
139-
metrics := make(map[string]*dto.MetricFamily)
140-
metrics["vllm:active_lora_adapters"] = &dto.MetricFamily{
141-
Metric: []*dto.Metric{},
142-
}
143-
metrics["vllm:info_active_adapters_info"] = &dto.MetricFamily{
144-
Metric: []*dto.Metric{
145-
{
146-
Label: []*dto.LabelPair{
147-
{
148-
Name: ptrString("active_adapters"),
149-
Value: ptrString(""),
150-
},
151-
},
152-
},
137+
func fakeMetrics(podNumber int) *backend.PodMetrics {
138+
metrics := &backend.PodMetrics{
139+
Metrics: backend.Metrics{
140+
CachedModels: make(map[string]int),
153141
},
154142
}
155143
for i := 0; i < *numModelsPerPod; i++ {
156-
mn := modelName(podNumber*(*numModelsPerPod) + i)
157-
one := &dto.Metric{
158-
Label: []*dto.LabelPair{
159-
{
160-
Name: ptrString("active_lora_adapters"),
161-
Value: ptrString(mn),
162-
},
163-
},
164-
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
165-
}
166-
metrics["vllm:active_lora_adapters"].Metric = append(metrics["vllm:active_lora_adapters"].Metric, one)
167-
168-
original := metrics["vllm:info_active_adapters_info"].Metric[0].Label[0].Value
169-
metrics["vllm:info_active_adapters_info"].Metric[0].Label[0].Value = ptrString(*original + "," + mn)
170-
}
171-
metrics[backend.RunningQueueSizeMetricName] = &dto.MetricFamily{
172-
Metric: []*dto.Metric{
173-
{
174-
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
175-
},
176-
},
177-
}
178-
metrics[backend.WaitingQueueSizeMetricName] = &dto.MetricFamily{
179-
Metric: []*dto.Metric{
180-
{
181-
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
182-
},
183-
},
184-
}
185-
metrics[backend.KVCacheUsagePercentMetricName] = &dto.MetricFamily{
186-
Metric: []*dto.Metric{
187-
{
188-
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
189-
},
190-
},
191-
}
192-
metrics[backend.KvCacheMaxTokenCapacityMetricName] = &dto.MetricFamily{
193-
Metric: []*dto.Metric{
194-
{
195-
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
196-
},
197-
},
144+
metrics.CachedModels[modelName(podNumber*(*numModelsPerPod)+i)] = 0
198145
}
199146
return metrics
200147
}
201148

202149
func modelName(i int) string {
203150
return fmt.Sprintf("adapter-%v", i)
204151
}
205-
206-
func ptrString(s string) *string {
207-
return &s
208-
}
209-
210-
func ptrFloat64(f float64) *float64 {
211-
return &f
212-
}

pkg/ext-proc/main.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,11 @@ import (
1919
klog "k8s.io/klog/v2"
2020

2121
"ext-proc/backend"
22+
"ext-proc/backend/vllm"
2223
"ext-proc/handlers"
2324
"ext-proc/scheduling"
2425
)
2526

26-
type extProcServer struct{}
27-
type server struct{}
28-
2927
var (
3028
port = flag.Int("port", 9002, "gRPC port")
3129
targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.")
@@ -75,7 +73,7 @@ func main() {
7573

7674
s := grpc.NewServer()
7775

78-
pp := backend.NewProvider(&backend.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods})
76+
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods})
7977
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
8078
klog.Fatalf("failed to initialize: %v", err)
8179
}

0 commit comments

Comments
 (0)