Skip to content

Refactor: Define PodMetricsClient interface and hide implementation details of vllm metrics processing #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions pkg/ext-proc/backend/fake.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
package backend

import (
dto "github.com/prometheus/client_model/go"
)

type FakePodLister struct {
Err error
Pods PodSet
}

type FakePodMetricsClient struct {
Err map[Pod]error
Res map[Pod]map[string]*dto.MetricFamily
Res map[Pod]*PodMetrics
}

func (f *FakePodMetricsClient) FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error) {
func (f *FakePodMetricsClient) FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error) {
if err, ok := f.Err[pod]; ok {
return nil, err
}
Expand Down
34 changes: 0 additions & 34 deletions pkg/ext-proc/backend/pod_client.go

This file was deleted.

38 changes: 36 additions & 2 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"go.uber.org/multierr"
klog "k8s.io/klog/v2"
)

Expand All @@ -27,7 +27,7 @@ type Provider struct {
}

type PodMetricsClient interface {
FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error)
FetchMetrics(pod Pod, existing *PodMetrics) (*PodMetrics, error)
}

type PodLister interface {
Expand Down Expand Up @@ -130,3 +130,37 @@ func (p *Provider) refreshPodsOnce() error {
p.podMetrics.Range(mergeFn)
return nil
}

func (p *Provider) refreshMetricsOnce() error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from metrics.go, no new code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this file is slightly confusing. It reads as if it's intended to be a factory class, but then we have metrics related funcs, while there is a metrics.go file.

What is this file intended to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah probably the name is too broad. It intends to "provide info of the backend", thus "provider", and metrics is part of the "info".

start := time.Now()
defer func() {
d := time.Since(start)
// TODO: add a metric instead of logging
klog.V(4).Infof("Refreshed metrics in %v", d)
}()
var wg sync.WaitGroup
var errs error
processOnePod := func(key, value any) bool {
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
pod := key.(Pod)
existing := value.(*PodMetrics)
wg.Add(1)
go func() {
defer wg.Done()
updated, err := p.pmc.FetchMetrics(pod, existing)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
return
}
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
}
p.UpdatePodMetrics(pod, updated)
}()
return true
}
p.podMetrics.Range(processOnePod)
wg.Wait()
return errs
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package backend
// Package vllm provides vllm specific pod metrics implementation.
package vllm

import (
"ext-proc/backend"
"fmt"
"net/http"
"strings"
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"go.uber.org/multierr"
klog "k8s.io/klog/v2"
)
Expand All @@ -25,45 +28,38 @@ const (
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity"
)

func (p *Provider) refreshMetricsOnce() error {
start := time.Now()
defer func() {
d := time.Now().Sub(start)
// TODO: add a metric instead of logging
klog.V(4).Infof("Refreshed metrics in %v", d)
}()
var wg sync.WaitGroup
var errs error
processOnePod := func(key, value any) bool {
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
pod := key.(Pod)
metrics := value.(*PodMetrics)
wg.Add(1)
go func() {
defer wg.Done()
metricFamilies, err := p.pmc.FetchMetrics(pod)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err))
return
}
updated, err := promToPodMetrics(metricFamilies, metrics)
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
}
p.UpdatePodMetrics(pod, updated)
}()
return true
type PodMetricsClientImpl struct {
}

// FetchMetrics fetches metrics from a given pod.
func (p *PodMetricsClientImpl) FetchMetrics(pod backend.Pod, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from pod_client.go, no new code

// Currently the metrics endpoint is hard-coded, which works with vLLM.
// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16): Consume this from LLMServerPool config.
url := fmt.Sprintf("http://%s/metrics", pod.Address)
resp, err := http.Get(url)
if err != nil {
klog.Errorf("failed to fetch metrics from %s: %v", pod, err)
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode)
}

parser := expfmt.TextParser{}
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
}
p.podMetrics.Range(processOnePod)
wg.Wait()
return errs
return promToPodMetrics(metricFamilies, existing)
}

// promToPodMetrics updates internal pod metrics with scraped prometheus metrics.
// A combined error is returned if errors occur in one or more metric processing.
// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map.
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *PodMetrics) (*PodMetrics, error) {
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *backend.PodMetrics) (*backend.PodMetrics, error) {
var errs error
updated := existing.Clone()
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
Expand Down
75 changes: 7 additions & 68 deletions pkg/ext-proc/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/bojand/ghz/runner"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/jhump/protoreflect/desc"
dto "github.com/prometheus/client_model/go"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -117,9 +116,9 @@ func startExtProc() {
s.Serve(lis)
}

func fakePods() (backend.PodSet, map[backend.Pod]map[string]*dto.MetricFamily) {
func fakePods() (backend.PodSet, map[backend.Pod]*backend.PodMetrics) {
pods := make(backend.PodSet)
metrics := make(map[backend.Pod]map[string]*dto.MetricFamily, *numFakePods)
metrics := make(map[backend.Pod]*backend.PodMetrics, *numFakePods)
for i := 0; i < *numFakePods; i++ {
address := fmt.Sprintf("address-%v", i)
pod := backend.Pod{
Expand All @@ -135,78 +134,18 @@ func fakePods() (backend.PodSet, map[backend.Pod]map[string]*dto.MetricFamily) {
}

// fakeMetrics adds numModelsPerPod number of adapters to the pod metrics.
func fakeMetrics(podNumber int) map[string]*dto.MetricFamily {
metrics := make(map[string]*dto.MetricFamily)
metrics["vllm:active_lora_adapters"] = &dto.MetricFamily{
Metric: []*dto.Metric{},
}
metrics["vllm:info_active_adapters_info"] = &dto.MetricFamily{
Metric: []*dto.Metric{
{
Label: []*dto.LabelPair{
{
Name: ptrString("active_adapters"),
Value: ptrString(""),
},
},
},
func fakeMetrics(podNumber int) *backend.PodMetrics {
metrics := &backend.PodMetrics{
Metrics: backend.Metrics{
CachedModels: make(map[string]int),
},
}
for i := 0; i < *numModelsPerPod; i++ {
mn := modelName(podNumber*(*numModelsPerPod) + i)
one := &dto.Metric{
Label: []*dto.LabelPair{
{
Name: ptrString("active_lora_adapters"),
Value: ptrString(mn),
},
},
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
}
metrics["vllm:active_lora_adapters"].Metric = append(metrics["vllm:active_lora_adapters"].Metric, one)

original := metrics["vllm:info_active_adapters_info"].Metric[0].Label[0].Value
metrics["vllm:info_active_adapters_info"].Metric[0].Label[0].Value = ptrString(*original + "," + mn)
}
metrics[backend.RunningQueueSizeMetricName] = &dto.MetricFamily{
Metric: []*dto.Metric{
{
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
},
},
}
metrics[backend.WaitingQueueSizeMetricName] = &dto.MetricFamily{
Metric: []*dto.Metric{
{
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
},
},
}
metrics[backend.KVCacheUsagePercentMetricName] = &dto.MetricFamily{
Metric: []*dto.Metric{
{
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
},
},
}
metrics[backend.KvCacheMaxTokenCapacityMetricName] = &dto.MetricFamily{
Metric: []*dto.Metric{
{
Gauge: &dto.Gauge{Value: ptrFloat64(0)},
},
},
metrics.CachedModels[modelName(podNumber*(*numModelsPerPod)+i)] = 0
}
return metrics
}

func modelName(i int) string {
return fmt.Sprintf("adapter-%v", i)
}

func ptrString(s string) *string {
return &s
}

func ptrFloat64(f float64) *float64 {
return &f
}
6 changes: 2 additions & 4 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ import (
klog "k8s.io/klog/v2"

"ext-proc/backend"
"ext-proc/backend/vllm"
"ext-proc/handlers"
"ext-proc/scheduling"
)

type extProcServer struct{}
type server struct{}

var (
port = flag.Int("port", 9002, "gRPC port")
targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.")
Expand Down Expand Up @@ -75,7 +73,7 @@ func main() {

s := grpc.NewServer()

pp := backend.NewProvider(&backend.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods})
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we move it to a vllm package? shouldn't this be agnostic the model server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Model servers don't always share the same metric names, so I expect we will need some "adapter code" for each model server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will likely share some helper functions across model servers, but until we integrate with the next model server, I put metrics scraping code in vllm.

if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil {
klog.Fatalf("failed to initialize: %v", err)
}
Expand Down