Skip to content

Improve the filter to return multiple preferred pods instead of one; also fix metrics update bug #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions pkg/ext-proc/backend/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ func (p *Provider) refreshMetricsOnce() error {
defer func() {
d := time.Now().Sub(start)
// TODO: add a metric instead of logging
klog.V(3).Infof("Refreshed metrics in %v", d)
klog.V(4).Infof("Refreshed metrics in %v", d)
}()
var wg sync.WaitGroup
var errs error
processOnePod := func(key, value any) bool {
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
pod := key.(Pod)
metrics := value.(*PodMetrics)
wg.Add(1)
Expand All @@ -46,7 +47,7 @@ func (p *Provider) refreshMetricsOnce() error {
return
}
updated, err := promToPodMetrics(metricFamilies, metrics)
klog.V(3).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
if err != nil {
multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err))
}
Expand All @@ -67,17 +68,17 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *Pod
updated := existing.Clone()
runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName)
multierr.Append(errs, err)
if err != nil {
updated.RunningQueueSize = int(runningQueueSize.GetCounter().GetValue())
if err == nil {
updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue())
}
waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName)
multierr.Append(errs, err)
if err != nil {
if err == nil {
updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue())
}
cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName)
multierr.Append(errs, err)
if err != nil {
if err == nil {
updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue()
}
/* TODO: uncomment once this is available in vllm.
Expand Down Expand Up @@ -126,10 +127,11 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
var latestTs int64
var latest *dto.Metric
for _, m := range mf.GetMetric() {
if m.GetTimestampMs() > latestTs {
if m.GetTimestampMs() >= latestTs {
latestTs = m.GetTimestampMs()
latest = m
}
}
klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName)
return latest, time.Unix(0, latestTs*1000), nil
}
16 changes: 13 additions & 3 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
return fmt.Errorf("failed to init metrics: %v", err)
}

klog.V(2).Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())

// periodically refresh pods
go func() {
for {
time.Sleep(refreshPodsInterval)
if err := p.refreshPodsOnce(); err != nil {
klog.V(1).Infof("Failed to refresh podslist pods: %v", err)
klog.V(4).Infof("Failed to refresh podslist pods: %v", err)
}
}
}()
Expand All @@ -81,11 +81,21 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(1).Infof("Failed to refresh metrics: %v", err)
klog.V(4).Infof("Failed to refresh metrics: %v", err)
}
}
}()

// Periodically print out the pods and metrics for DEBUGGING.
if klog.V(2).Enabled() {
go func() {
for {
time.Sleep(5 * time.Second)
klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not print this as part of the refresh loop above instead of spawning a separate go routine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refresh loop is run much more frequently (20ms in my benchmark).

}
}()
}

return nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// parameter.
// Envoy sends the request body to ext proc before sending the request to the backend server.
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(2).Infof("Handling request body")
klog.V(3).Infof("Handling request body")

// Unmarshal request body (must be JSON).
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
Expand All @@ -24,14 +24,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
klog.Errorf("Error unmarshaling request body: %v", err)
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
}
klog.V(2).Infof("Request body: %v", rb)
klog.V(3).Infof("Request body: %v", rb)

// Resolve target models.
model, ok := rb["model"].(string)
if !ok {
return nil, fmt.Errorf("model not found in request")
}
klog.V(2).Infof("Model requested: %v", model)
klog.V(3).Infof("Model requested: %v", model)
llmReq := &scheduling.LLMRequest{
Model: model,
// For now use the model as the target model.
Expand All @@ -47,13 +47,13 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
klog.Errorf("Error marshaling request body: %v", err)
return nil, fmt.Errorf("error marshaling request body: %v", err)
}
klog.V(2).Infof("Updated body: %v", updatedBody)
klog.V(3).Infof("Updated body: %v", updatedBody)

targetPod, err := s.scheduler.Schedule(llmReq)
if err != nil {
return nil, fmt.Errorf("failed to find target pod: %v", err)
}
klog.V(2).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)

reqCtx.Model = llmReq.Model
reqCtx.TargetPod = targetPod
Expand All @@ -69,7 +69,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}
// Print headers for debugging
for _, header := range headers {
klog.V(2).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
}

resp := &extProcPb.ProcessingResponse{
Expand All @@ -93,10 +93,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}

func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
klog.V(2).Info("--- In RequestHeaders processing ...")
klog.V(3).Info("--- In RequestHeaders processing ...")
r := req.Request
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
klog.V(2).Infof("Headers: %+v\n", h)
klog.V(3).Infof("Headers: %+v\n", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_RequestHeaders{
Expand Down
4 changes: 2 additions & 2 deletions pkg/ext-proc/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

// HandleResponseHeaders processes response headers from the backend model server.
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(2).Info("Processing ResponseHeaders")
klog.V(3).Info("Processing ResponseHeaders")
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
klog.V(2).Infof("Headers before: %+v\n", h)
klog.V(3).Infof("Headers before: %+v\n", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
Expand Down
17 changes: 10 additions & 7 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type PodProvider interface {
}

func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
klog.V(2).Info("Processing")
klog.V(3).Info("Processing")
ctx := srv.Context()
// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
Expand All @@ -59,22 +59,25 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
return nil
}
if err != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
klog.V(3).Infof("cannot receive stream request: %v", err)
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}

resp := &extProcPb.ProcessingResponse{}
switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
resp = HandleRequestHeaders(reqCtx, req)
klog.V(2).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
case *extProcPb.ProcessingRequest_RequestBody:
resp, err = s.HandleRequestBody(reqCtx, req)
klog.V(2).Infof("Request context after HandleRequestBody: %v", reqCtx)
klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseHeaders:
resp, err = s.HandleResponseHeaders(reqCtx, req)
klog.V(2).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
default:
klog.Infof("Unknown Request type %+v", v)
klog.Errorf("Unknown Request type %+v", v)
return status.Error(codes.Unknown, "unknown request type")
}

Expand All @@ -83,9 +86,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
return status.Errorf(codes.Unknown, "failed to handle request: %v", err)
}

klog.V(2).Infof("response: %v", resp)
klog.V(3).Infof("response: %v", resp)
if err := srv.Send(resp); err != nil {
klog.Infof("send error %v", err)
klog.Errorf("send error %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
}
Expand Down
53 changes: 39 additions & 14 deletions pkg/ext-proc/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,24 @@ func (f *filter) Name() string {

func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
if f == nil {
klog.V(2).Infof("Running nil filter, returning all input pods by default")
klog.V(3).Infof("Running nil filter, returning all input pods by default")
return pods, nil
}
klog.V(2).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods))

filtered, err := f.filter(b, pods)

next := f.nextOnSuccessOrFailure
if err == nil {
klog.V(2).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered))
if f.nextOnSuccess != nil {
next = f.nextOnSuccess
}
// On success, pass the filtered result to the next filter.
return next.Filter(b, filtered)
}

klog.V(2).Infof("onFailure %v -> %v", f.name, next.Name())
klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name())
if f.nextOnFailure != nil {
next = f.nextOnFailure
}
Expand Down Expand Up @@ -88,32 +88,57 @@ func toFilterFunc(pp podPredicate) filterFunc {
}
}

// leastQueuingFilterFunc finds the max and min queue size of all pods, divides the whole range
// (max-min) by the number of pods, and finds the pods that fall into the first range.
// The intuition is that if there are multiple pods that share similar queue size in the low range,
// we should consider them all instead of the absolute minimum one. This worked better than picking
// the least one as it gives more choices for the next filter, which on aggregate gave better
// results.
// TODO: Compare this strategy with other strategies such as top K.
func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxInt
max := 0
filtered := []*backend.PodMetrics{}

for _, pod := range pods {
if pod.WaitingQueueSize < min {
if pod.WaitingQueueSize <= min {
min = pod.WaitingQueueSize
filtered = []*backend.PodMetrics{}
}
if pod.WaitingQueueSize == min {
if pod.WaitingQueueSize >= max {
max = pod.WaitingQueueSize
}
}

for _, pod := range pods {
if pod.WaitingQueueSize >= min && pod.WaitingQueueSize <= min+(max-min)/len(pods) {
filtered = append(filtered, pod)
}
}
return filtered, nil
}

// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range
// (max-min) by the number of pods, and finds the pods that fall into the first range.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intuition behind setting the range this way compared to finding the least-k elements for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing in particular. I will try to compare a few different algos and tune them with different parameters to see the difference. But for now I don't have data to see which one is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment with the reason behind this. And added a TODO to compare with other strategies.

// The intuition is that if there are multiple pods that share similar KV cache in the low range, we
// should consider them all instead of the absolute minimum one. This worked better than picking the
// least one as it gives more choices for the next filter, which on aggregate gave better results.
// TODO: Compare this strategy with other strategies such as top K.
func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
min := math.MaxInt
min := math.MaxFloat64
max := math.SmallestNonzeroFloat64
filtered := []*backend.PodMetrics{}
margin := 5

for _, pod := range pods {
cur := int(pod.KVCacheUsagePercent) / margin
if cur < min {
min = cur
filtered = []*backend.PodMetrics{}
if pod.KVCacheUsagePercent <= min {
min = pod.KVCacheUsagePercent
}
if pod.KVCacheUsagePercent >= max {
max = pod.KVCacheUsagePercent
}
if cur == min {
}

for _, pod := range pods {
if pod.KVCacheUsagePercent >= min && pod.KVCacheUsagePercent <= min+(max-min)/float64(len(pods)) {
filtered = append(filtered, pod)
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ext-proc/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package scheduling

import (
"fmt"
"math/rand"

klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -44,10 +45,10 @@ type PodMetricsProvider interface {

// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(b *LLMRequest) (targetPod *backend.Pod, err error) {
klog.V(2).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics())
klog.V(3).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics())
pods, err := s.filter.Filter(b, s.podMetricsProvider.AllPodMetrics())
if err != nil || len(pods) == 0 {
klog.Errorf("Failed to apply filter, this should never happen: %v", err)
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %v", len(pods), err)
}
i := rand.Intn(len(pods))
return &pods[i].Pod, nil
Expand Down