diff --git a/pkg/ext-proc/backend/metrics.go b/pkg/ext-proc/backend/metrics.go index 7d7b5470..d0e9fab5 100644 --- a/pkg/ext-proc/backend/metrics.go +++ b/pkg/ext-proc/backend/metrics.go @@ -30,11 +30,12 @@ func (p *Provider) refreshMetricsOnce() error { defer func() { d := time.Now().Sub(start) // TODO: add a metric instead of logging - klog.V(3).Infof("Refreshed metrics in %v", d) + klog.V(4).Infof("Refreshed metrics in %v", d) }() var wg sync.WaitGroup var errs error processOnePod := func(key, value any) bool { + klog.V(4).Infof("Processing pod %v and metric %v", key, value) pod := key.(Pod) metrics := value.(*PodMetrics) wg.Add(1) @@ -46,7 +47,7 @@ func (p *Provider) refreshMetricsOnce() error { return } updated, err := promToPodMetrics(metricFamilies, metrics) - klog.V(3).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) + klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) if err != nil { multierr.Append(errs, fmt.Errorf("failed to get all pod metrics updated from prometheus: %v", err)) } @@ -67,17 +68,17 @@ func promToPodMetrics(metricFamilies map[string]*dto.MetricFamily, existing *Pod updated := existing.Clone() runningQueueSize, _, err := getLatestMetric(metricFamilies, RunningQueueSizeMetricName) multierr.Append(errs, err) - if err != nil { - updated.RunningQueueSize = int(runningQueueSize.GetCounter().GetValue()) + if err == nil { + updated.RunningQueueSize = int(runningQueueSize.GetGauge().GetValue()) } waitingQueueSize, _, err := getLatestMetric(metricFamilies, WaitingQueueSizeMetricName) multierr.Append(errs, err) - if err != nil { + if err == nil { updated.WaitingQueueSize = int(waitingQueueSize.GetGauge().GetValue()) } cachePercent, _, err := getLatestMetric(metricFamilies, KVCacheUsagePercentMetricName) multierr.Append(errs, err) - if err != nil { + if err == nil { updated.KVCacheUsagePercent = cachePercent.GetGauge().GetValue() } /* TODO: uncomment once this is available in vllm. @@ -126,10 +127,11 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str var latestTs int64 var latest *dto.Metric for _, m := range mf.GetMetric() { - if m.GetTimestampMs() > latestTs { + if m.GetTimestampMs() >= latestTs { latestTs = m.GetTimestampMs() latest = m } } + klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName) return latest, time.Unix(0, latestTs*1000), nil } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 60fb627c..143daa9d 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -64,14 +64,14 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio return fmt.Errorf("failed to init metrics: %v", err) } - klog.V(2).Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) + klog.Infof("Initialized pods and metrics: %+v", p.AllPodMetrics()) // periodically refresh pods go func() { for { time.Sleep(refreshPodsInterval) if err := p.refreshPodsOnce(); err != nil { - klog.V(1).Infof("Failed to refresh podslist pods: %v", err) + klog.V(4).Infof("Failed to refresh podslist pods: %v", err) } } }() @@ -81,11 +81,21 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio for { time.Sleep(refreshMetricsInterval) if err := p.refreshMetricsOnce(); err != nil { - klog.V(1).Infof("Failed to refresh metrics: %v", err) + klog.V(4).Infof("Failed to refresh metrics: %v", err) } } }() + // Periodically print out the pods and metrics for DEBUGGING. + if klog.V(2).Enabled() { + go func() { + for { + time.Sleep(5 * time.Second) + klog.Infof("===DEBUG: Current Pods and metrics: %+v", p.AllPodMetrics()) + } + }() + } + return nil } diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 6fe06303..150f5cf4 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -15,7 +15,7 @@ import ( // parameter. // Envoy sends the request body to ext proc before sending the request to the backend server. func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(2).Infof("Handling request body") + klog.V(3).Infof("Handling request body") // Unmarshal request body (must be JSON). v := req.Request.(*extProcPb.ProcessingRequest_RequestBody) @@ -24,14 +24,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.Errorf("Error unmarshaling request body: %v", err) return nil, fmt.Errorf("error unmarshaling request body: %v", err) } - klog.V(2).Infof("Request body: %v", rb) + klog.V(3).Infof("Request body: %v", rb) // Resolve target models. model, ok := rb["model"].(string) if !ok { return nil, fmt.Errorf("model not found in request") } - klog.V(2).Infof("Model requested: %v", model) + klog.V(3).Infof("Model requested: %v", model) llmReq := &scheduling.LLMRequest{ Model: model, // For now use the model as the target model. @@ -47,13 +47,13 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.Errorf("Error marshaling request body: %v", err) return nil, fmt.Errorf("error marshaling request body: %v", err) } - klog.V(2).Infof("Updated body: %v", updatedBody) + klog.V(3).Infof("Updated body: %v", updatedBody) targetPod, err := s.scheduler.Schedule(llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %v", err) } - klog.V(2).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) + klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) reqCtx.Model = llmReq.Model reqCtx.TargetPod = targetPod @@ -69,7 +69,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } // Print headers for debugging for _, header := range headers { - klog.V(2).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) + klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) } resp := &extProcPb.ProcessingResponse{ @@ -93,10 +93,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse { - klog.V(2).Info("--- In RequestHeaders processing ...") + klog.V(3).Info("--- In RequestHeaders processing ...") r := req.Request h := r.(*extProcPb.ProcessingRequest_RequestHeaders) - klog.V(2).Infof("Headers: %+v\n", h) + klog.V(3).Infof("Headers: %+v\n", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 1719b45a..7378b800 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -8,9 +8,9 @@ import ( // HandleResponseHeaders processes response headers from the backend model server. func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(2).Info("Processing ResponseHeaders") + klog.V(3).Info("Processing ResponseHeaders") h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) - klog.V(2).Infof("Headers before: %+v\n", h) + klog.V(3).Infof("Headers before: %+v\n", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index e2aee316..82d9bb69 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -41,7 +41,7 @@ type PodProvider interface { } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { - klog.V(2).Info("Processing") + klog.V(3).Info("Processing") ctx := srv.Context() // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. @@ -59,6 +59,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { return nil } if err != nil { + // This error occurs very frequently, though it doesn't seem to have any impact. + // TODO Figure out if we can remove this noise. + klog.V(3).Infof("cannot receive stream request: %v", err) return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) } @@ -66,15 +69,15 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { switch v := req.Request.(type) { case *extProcPb.ProcessingRequest_RequestHeaders: resp = HandleRequestHeaders(reqCtx, req) - klog.V(2).Infof("Request context after HandleRequestHeaders: %v", reqCtx) + klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx) case *extProcPb.ProcessingRequest_RequestBody: resp, err = s.HandleRequestBody(reqCtx, req) - klog.V(2).Infof("Request context after HandleRequestBody: %v", reqCtx) + klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx) case *extProcPb.ProcessingRequest_ResponseHeaders: resp, err = s.HandleResponseHeaders(reqCtx, req) - klog.V(2).Infof("Request context after HandleResponseHeaders: %v", reqCtx) + klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx) default: - klog.Infof("Unknown Request type %+v", v) + klog.Errorf("Unknown Request type %+v", v) return status.Error(codes.Unknown, "unknown request type") } @@ -83,9 +86,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { return status.Errorf(codes.Unknown, "failed to handle request: %v", err) } - klog.V(2).Infof("response: %v", resp) + klog.V(3).Infof("response: %v", resp) if err := srv.Send(resp); err != nil { - klog.Infof("send error %v", err) + klog.Errorf("send error %v", err) return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } } diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index 11e223a8..c8097a06 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -43,16 +43,16 @@ func (f *filter) Name() string { func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { if f == nil { - klog.V(2).Infof("Running nil filter, returning all input pods by default") + klog.V(3).Infof("Running nil filter, returning all input pods by default") return pods, nil } - klog.V(2).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods)) + klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, b, len(pods)) filtered, err := f.filter(b, pods) next := f.nextOnSuccessOrFailure if err == nil { - klog.V(2).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered)) + klog.V(3).Infof("onSuccess %v -> %v, filtered: %v", f.name, next.Name(), len(filtered)) if f.nextOnSuccess != nil { next = f.nextOnSuccess } @@ -60,7 +60,7 @@ func (f *filter) Filter(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.P return next.Filter(b, filtered) } - klog.V(2).Infof("onFailure %v -> %v", f.name, next.Name()) + klog.V(3).Infof("onFailure %v -> %v", f.name, next.Name()) if f.nextOnFailure != nil { next = f.nextOnFailure } @@ -88,32 +88,57 @@ func toFilterFunc(pp podPredicate) filterFunc { } } +// leastQueuingFilterFunc finds the max and min queue size of all pods, divides the whole range +// (max-min) by the number of pods, and finds the pods that fall into the first range. +// The intuition is that if there are multiple pods that share similar queue size in the low range, +// we should consider them all instead of the absolute minimum one. This worked better than picking +// the least one as it gives more choices for the next filter, which on aggregate gave better +// results. +// TODO: Compare this strategy with other strategies such as top K. func leastQueuingFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { min := math.MaxInt + max := 0 filtered := []*backend.PodMetrics{} + for _, pod := range pods { - if pod.WaitingQueueSize < min { + if pod.WaitingQueueSize <= min { min = pod.WaitingQueueSize - filtered = []*backend.PodMetrics{} } - if pod.WaitingQueueSize == min { + if pod.WaitingQueueSize >= max { + max = pod.WaitingQueueSize + } + } + + for _, pod := range pods { + if pod.WaitingQueueSize >= min && pod.WaitingQueueSize <= min+(max-min)/len(pods) { filtered = append(filtered, pod) } } return filtered, nil } +// leastKVCacheFilterFunc finds the max and min KV cache of all pods, divides the whole range +// (max-min) by the number of pods, and finds the pods that fall into the first range. +// The intuition is that if there are multiple pods that share similar KV cache in the low range, we +// should consider them all instead of the absolute minimum one. This worked better than picking the +// least one as it gives more choices for the next filter, which on aggregate gave better results. +// TODO: Compare this strategy with other strategies such as top K. func leastKVCacheFilterFunc(b *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - min := math.MaxInt + min := math.MaxFloat64 + max := math.SmallestNonzeroFloat64 filtered := []*backend.PodMetrics{} - margin := 5 + for _, pod := range pods { - cur := int(pod.KVCacheUsagePercent) / margin - if cur < min { - min = cur - filtered = []*backend.PodMetrics{} + if pod.KVCacheUsagePercent <= min { + min = pod.KVCacheUsagePercent + } + if pod.KVCacheUsagePercent >= max { + max = pod.KVCacheUsagePercent } - if cur == min { + } + + for _, pod := range pods { + if pod.KVCacheUsagePercent >= min && pod.KVCacheUsagePercent <= min+(max-min)/float64(len(pods)) { filtered = append(filtered, pod) } } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index be7501ae..9bcd25bb 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -2,6 +2,7 @@ package scheduling import ( + "fmt" "math/rand" klog "k8s.io/klog/v2" @@ -44,10 +45,10 @@ type PodMetricsProvider interface { // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(b *LLMRequest) (targetPod *backend.Pod, err error) { - klog.V(2).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics()) + klog.V(3).Infof("request: %v; metrics: %+v", b, s.podMetricsProvider.AllPodMetrics()) pods, err := s.filter.Filter(b, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { - klog.Errorf("Failed to apply filter, this should never happen: %v", err) + return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %v", len(pods), err) } i := rand.Intn(len(pods)) return &pods[i].Pod, nil