-
Notifications
You must be signed in to change notification settings - Fork 76
Refactor: Define PodMetricsClient interface and hide implementation details of vllm metrics processing #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
package backend | ||
// Package vllm provides vllm specific pod metrics implementation. | ||
package vllm | ||
|
||
import ( | ||
"ext-proc/backend" | ||
"fmt" | ||
"net/http" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
dto "github.com/prometheus/client_model/go" | ||
"github.com/prometheus/common/expfmt" | ||
"go.uber.org/multierr" | ||
klog "k8s.io/klog/v2" | ||
) | ||
|
@@ -25,45 +28,38 @@ const ( | |
KvCacheMaxTokenCapacityMetricName = "vllm:gpu_cache_max_token_capacity" | ||
) | ||
|
||
func (p *Provider) refreshMetricsOnce() error { | ||
start := time.Now() | ||
defer func() { | ||
d := time.Now().Sub(start) | ||
// TODO: add a metric instead of logging | ||
klog.V(4).Infof("Refreshed metrics in %v", d) | ||
}() | ||
var wg sync.WaitGroup | ||
var errs error | ||
processOnePod := func(key, value any) bool { | ||
klog.V(4).Infof("Processing pod %v and metric %v", key, value) | ||
pod := key.(Pod) | ||
metrics := value.(*PodMetrics) | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
metricFamilies, err := p.pmc.FetchMetrics(pod) | ||
if err != nil { | ||
multierr.Append(errs, fmt.Errorf("failed to parse metrics from %s: %v", pod, err)) | ||
return | ||
} | ||
updated, err := promToPodMetrics(metricFamilies, metrics) | ||
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) | ||
if err != nil { | ||
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err)) | ||
} | ||
p.UpdatePodMetrics(pod, updated) | ||
}() | ||
return true | ||
type PodMetricsClientImpl struct { | ||
} | ||
|
||
// FetchMetrics fetches metrics from a given pod. | ||
func (p *PodMetricsClientImpl) FetchMetrics(pod backend.Pod, existing *backend.PodMetrics) (*backend.PodMetrics, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is moved from pod_client.go, no new code |
||
// Currently the metrics endpoint is hard-coded, which works with vLLM. | ||
// TODO(https://github.com/kubernetes-sigs/llm-instance-gateway/issues/16): Consume this from LLMServerPool config. | ||
url := fmt.Sprintf("http://%s/metrics", pod.Address) | ||
resp, err := http.Get(url) | ||
if err != nil { | ||
klog.Errorf("failed to fetch metrics from %s: %v", pod, err) | ||
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", pod, err) | ||
} | ||
defer resp.Body.Close() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
klog.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) | ||
return nil, fmt.Errorf("unexpected status code from %s: %v", pod, resp.StatusCode) | ||
} | ||
|
||
parser := expfmt.TextParser{} | ||
metricFamilies, err := parser.TextToMetricFamilies(resp.Body) | ||
if err != nil { | ||
return nil, err | ||
} | ||
p.podMetrics.Range(processOnePod) | ||
wg.Wait() | ||
return errs | ||
return promToPodMetrics(metricFamilies, existing) | ||
} | ||
|
||
// promToPodMetrics updates internal pod metrics with scraped prometheus metrics. | ||
// A combined error is returned if errors occur in one or more metric processing. | ||
// it returns a new PodMetrics pointer which can be used to atomically update the pod metrics map. | ||
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *PodMetrics) (*PodMetrics, error) { | ||
func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *backend.PodMetrics) (*backend.PodMetrics, error) { | ||
var errs error | ||
updated := existing.Clone() | ||
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,11 @@ import ( | |
klog "k8s.io/klog/v2" | ||
|
||
"ext-proc/backend" | ||
"ext-proc/backend/vllm" | ||
"ext-proc/handlers" | ||
"ext-proc/scheduling" | ||
) | ||
|
||
type extProcServer struct{} | ||
type server struct{} | ||
|
||
var ( | ||
port = flag.Int("port", 9002, "gRPC port") | ||
targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.") | ||
|
@@ -75,7 +73,7 @@ func main() { | |
|
||
s := grpc.NewServer() | ||
|
||
pp := backend.NewProvider(&backend.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods}) | ||
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why did we move it to a vllm package? shouldn't this be agnostic the model server? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Model servers don't always share the same metric names, so I expect we will need some "adapter code" for each model server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will likely share some helper functions across model servers, but until we integrate with the next model server, I put metrics scraping code in vllm. |
||
if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil { | ||
klog.Fatalf("failed to initialize: %v", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from metrics.go, no new code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this file is slightly confusing. It reads as if it's intended to be a factory class, but then we have metrics related funcs, while there is a metrics.go file.
What is this file intended to do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah probably the name is too broad. It intends to "provide info of the backend", thus "provider", and metrics is part of the "info".