From e907f6076bc3089a8a75227b80ff1293af7d00dc Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Fri, 6 Dec 2024 16:16:01 -0800 Subject: [PATCH] Add response body handler --- pkg/ext-proc/handlers/request.go | 2 +- pkg/ext-proc/handlers/response.go | 58 +++++++++++++++++ pkg/ext-proc/handlers/response_test.go | 87 ++++++++++++++++++++++++++ pkg/ext-proc/handlers/server.go | 14 +++-- pkg/ext-proc/scheduling/filter.go | 6 +- pkg/ext-proc/scheduling/scheduler.go | 6 +- pkg/manifests/ext_proc.yaml | 3 +- 7 files changed, 163 insertions(+), 13 deletions(-) create mode 100644 pkg/ext-proc/handlers/response_test.go diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index d7ec5e9db..482c2aec4 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -120,7 +120,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse { - klog.V(3).Info("--- In RequestHeaders processing ...") + klog.V(3).Info("Handling request headers ...") r := req.Request h := r.(*extProcPb.ProcessingRequest_RequestHeaders) klog.V(3).Infof("Headers: %+v\n", h) diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 7378b8007..7041f8b8b 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -1,6 +1,9 @@ package handlers import ( + "encoding/json" + "fmt" + configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" klog "k8s.io/klog/v2" @@ -33,3 +36,58 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr } return resp, nil } + +// HandleResponseBody parses response body to update information such as number of completion tokens. +// Example response +/* +{ + "id": "cmpl-573498d260f2423f9e42817bbba3743a", + "object": "text_completion", + "created": 1732563765, + "model": "meta-llama/Llama-2-7b-hf", + "choices": [ + { + "index": 0, + "text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section", + "logprobs": null, + "finish_reason": "length", + "stop_reason": null, + "prompt_logprobs": null + } + ], + "usage": { + "prompt_tokens": 11, + "total_tokens": 111, + "completion_tokens": 100 + } +}*/ +func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { + klog.V(3).Info("Processing HandleResponseBody") + body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) + + res := Response{} + if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { + return nil, fmt.Errorf("unmarshaling response body: %v", err) + } + reqCtx.Response = res + klog.V(3).Infof("Response: %+v", res) + + resp := &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ResponseBody{ + ResponseBody: &extProcPb.BodyResponse{ + Response: &extProcPb.CommonResponse{}, + }, + }, + } + return resp, nil +} + +type Response struct { + Usage Usage `json:"usage"` +} + +type Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} diff --git a/pkg/ext-proc/handlers/response_test.go b/pkg/ext-proc/handlers/response_test.go new file mode 100644 index 000000000..df3380662 --- /dev/null +++ b/pkg/ext-proc/handlers/response_test.go @@ -0,0 +1,87 @@ +package handlers + +import ( + "testing" + + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/google/go-cmp/cmp" +) + +const ( + body = ` + { + "id": "cmpl-573498d260f2423f9e42817bbba3743a", + "object": "text_completion", + "created": 1732563765, + "model": "meta-llama/Llama-2-7b-hf", + "choices": [ + { + "index": 0, + "text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section", + "logprobs": null, + "finish_reason": "length", + "stop_reason": null, + "prompt_logprobs": null + } + ], + "usage": { + "prompt_tokens": 11, + "total_tokens": 111, + "completion_tokens": 100 + } + } + ` +) + +func TestHandleResponseBody(t *testing.T) { + tests := []struct { + name string + req *extProcPb.ProcessingRequest_ResponseBody + want Response + wantErr bool + }{ + { + name: "success", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(body), + }, + }, + want: Response{ + Usage: Usage{ + PromptTokens: 11, + TotalTokens: 111, + CompletionTokens: 100, + }, + }, + }, + { + name: "malformed response", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte("malformed json"), + }, + }, + wantErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + server := &Server{} + reqCtx := &RequestContext{} + _, err := server.HandleResponseBody(reqCtx, &extProcPb.ProcessingRequest{Request: test.req}) + + if err != nil { + if !test.wantErr { + t.Fatalf("HandleResponseBody returned unexpected error: %v, want %v", err, test.wantErr) + } + return + } + + if diff := cmp.Diff(test.want, reqCtx.Response); diff != "" { + t.Errorf("HandleResponseBody returned unexpected response, diff(-want, +got): %v", diff) + } + }) + } +} diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index e59ab12de..e7fc19c9c 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -35,7 +35,7 @@ type Server struct { } type Scheduler interface { - Schedule(b *scheduling.LLMRequest) (targetPod *backend.Pod, err error) + Schedule(b *scheduling.LLMRequest) (targetPod backend.Pod, err error) } // PodProvider is an interface to provide set of pods in the backend and information such as metrics. @@ -77,13 +77,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { switch v := req.Request.(type) { case *extProcPb.ProcessingRequest_RequestHeaders: resp = HandleRequestHeaders(reqCtx, req) - klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx) + klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx) case *extProcPb.ProcessingRequest_RequestBody: resp, err = s.HandleRequestBody(reqCtx, req) - klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx) + klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx) case *extProcPb.ProcessingRequest_ResponseHeaders: resp, err = s.HandleResponseHeaders(reqCtx, req) - klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx) + klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx) + case *extProcPb.ProcessingRequest_ResponseBody: + resp, err = s.HandleResponseBody(reqCtx, req) + klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx) default: klog.Errorf("Unknown Request type %+v", v) return status.Error(codes.Unknown, "unknown request type") @@ -119,6 +122,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { // RequestContext stores context information during the life time of an HTTP request. type RequestContext struct { - TargetPod *backend.Pod + TargetPod backend.Pod Model string + Response Response } diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index 553308846..bcc4432b2 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -157,9 +157,9 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool // We consider serving an adapter low cost it the adapter is active in the model server, or the -// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the -// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod. -// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters. +// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the +// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod. +// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters. func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool { _, ok := pod.ActiveModels[req.ResolvedTargetModel] return ok || len(pod.ActiveModels) < pod.MaxActiveModels diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index b45c3737f..6078fecc2 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -110,13 +110,13 @@ type PodMetricsProvider interface { } // Schedule finds the target pod based on metrics and the requested lora adapter. -func (s *Scheduler) Schedule(req *LLMRequest) (targetPod *backend.Pod, err error) { +func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { - return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) + return backend.Pod{}, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods) i := rand.Intn(len(pods)) - return &pods[i].Pod, nil + return pods[i].Pod, nil } diff --git a/pkg/manifests/ext_proc.yaml b/pkg/manifests/ext_proc.yaml index ffaeadd1e..97a21933c 100644 --- a/pkg/manifests/ext_proc.yaml +++ b/pkg/manifests/ext_proc.yaml @@ -93,7 +93,8 @@ spec: processingMode: request: body: Buffered - response: {} + response: + body: Buffered # The timeouts are likely not needed here. We can experiment with removing/tuning them slowly. # The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly. messageTimeout: 1000s