Skip to content

Commit 769466d

Browse files
liu-congkfswain
authored andcommitted
Add response body handler
1 parent a4fc06d commit 769466d

File tree

7 files changed

+163
-13
lines changed

7 files changed

+163
-13
lines changed

pkg/ext-proc/handlers/request.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
120120
}
121121

122122
func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
123-
klog.V(3).Info("--- In RequestHeaders processing ...")
123+
klog.V(3).Info("Handling request headers ...")
124124
r := req.Request
125125
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
126126
klog.V(3).Infof("Headers: %+v\n", h)

pkg/ext-proc/handlers/response.go

+58
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package handlers
22

33
import (
4+
"encoding/json"
5+
"fmt"
6+
47
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
58
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
69
klog "k8s.io/klog/v2"
@@ -33,3 +36,58 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
3336
}
3437
return resp, nil
3538
}
39+
40+
// HandleResponseBody parses response body to update information such as number of completion tokens.
41+
// Example response
42+
/*
43+
{
44+
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
45+
"object": "text_completion",
46+
"created": 1732563765,
47+
"model": "meta-llama/Llama-2-7b-hf",
48+
"choices": [
49+
{
50+
"index": 0,
51+
"text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section",
52+
"logprobs": null,
53+
"finish_reason": "length",
54+
"stop_reason": null,
55+
"prompt_logprobs": null
56+
}
57+
],
58+
"usage": {
59+
"prompt_tokens": 11,
60+
"total_tokens": 111,
61+
"completion_tokens": 100
62+
}
63+
}*/
64+
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
65+
klog.V(3).Info("Processing HandleResponseBody")
66+
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)
67+
68+
res := Response{}
69+
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
70+
return nil, fmt.Errorf("unmarshaling response body: %v", err)
71+
}
72+
reqCtx.Response = res
73+
klog.V(3).Infof("Response: %+v", res)
74+
75+
resp := &extProcPb.ProcessingResponse{
76+
Response: &extProcPb.ProcessingResponse_ResponseBody{
77+
ResponseBody: &extProcPb.BodyResponse{
78+
Response: &extProcPb.CommonResponse{},
79+
},
80+
},
81+
}
82+
return resp, nil
83+
}
84+
85+
type Response struct {
86+
Usage Usage `json:"usage"`
87+
}
88+
89+
type Usage struct {
90+
PromptTokens int `json:"prompt_tokens"`
91+
CompletionTokens int `json:"completion_tokens"`
92+
TotalTokens int `json:"total_tokens"`
93+
}
+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package handlers
2+
3+
import (
4+
"testing"
5+
6+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
7+
"github.com/google/go-cmp/cmp"
8+
)
9+
10+
const (
11+
body = `
12+
{
13+
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
14+
"object": "text_completion",
15+
"created": 1732563765,
16+
"model": "meta-llama/Llama-2-7b-hf",
17+
"choices": [
18+
{
19+
"index": 0,
20+
"text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section",
21+
"logprobs": null,
22+
"finish_reason": "length",
23+
"stop_reason": null,
24+
"prompt_logprobs": null
25+
}
26+
],
27+
"usage": {
28+
"prompt_tokens": 11,
29+
"total_tokens": 111,
30+
"completion_tokens": 100
31+
}
32+
}
33+
`
34+
)
35+
36+
func TestHandleResponseBody(t *testing.T) {
37+
tests := []struct {
38+
name string
39+
req *extProcPb.ProcessingRequest_ResponseBody
40+
want Response
41+
wantErr bool
42+
}{
43+
{
44+
name: "success",
45+
req: &extProcPb.ProcessingRequest_ResponseBody{
46+
ResponseBody: &extProcPb.HttpBody{
47+
Body: []byte(body),
48+
},
49+
},
50+
want: Response{
51+
Usage: Usage{
52+
PromptTokens: 11,
53+
TotalTokens: 111,
54+
CompletionTokens: 100,
55+
},
56+
},
57+
},
58+
{
59+
name: "malformed response",
60+
req: &extProcPb.ProcessingRequest_ResponseBody{
61+
ResponseBody: &extProcPb.HttpBody{
62+
Body: []byte("malformed json"),
63+
},
64+
},
65+
wantErr: true,
66+
},
67+
}
68+
69+
for _, test := range tests {
70+
t.Run(test.name, func(t *testing.T) {
71+
server := &Server{}
72+
reqCtx := &RequestContext{}
73+
_, err := server.HandleResponseBody(reqCtx, &extProcPb.ProcessingRequest{Request: test.req})
74+
75+
if err != nil {
76+
if !test.wantErr {
77+
t.Fatalf("HandleResponseBody returned unexpected error: %v, want %v", err, test.wantErr)
78+
}
79+
return
80+
}
81+
82+
if diff := cmp.Diff(test.want, reqCtx.Response); diff != "" {
83+
t.Errorf("HandleResponseBody returned unexpected response, diff(-want, +got): %v", diff)
84+
}
85+
})
86+
}
87+
}

pkg/ext-proc/handlers/server.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Server struct {
3535
}
3636

3737
type Scheduler interface {
38-
Schedule(b *scheduling.LLMRequest) (targetPod *backend.Pod, err error)
38+
Schedule(b *scheduling.LLMRequest) (targetPod backend.Pod, err error)
3939
}
4040

4141
// PodProvider is an interface to provide set of pods in the backend and information such as metrics.
@@ -77,13 +77,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
7777
switch v := req.Request.(type) {
7878
case *extProcPb.ProcessingRequest_RequestHeaders:
7979
resp = HandleRequestHeaders(reqCtx, req)
80-
klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
80+
klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
8181
case *extProcPb.ProcessingRequest_RequestBody:
8282
resp, err = s.HandleRequestBody(reqCtx, req)
83-
klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx)
83+
klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx)
8484
case *extProcPb.ProcessingRequest_ResponseHeaders:
8585
resp, err = s.HandleResponseHeaders(reqCtx, req)
86-
klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
86+
klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
87+
case *extProcPb.ProcessingRequest_ResponseBody:
88+
resp, err = s.HandleResponseBody(reqCtx, req)
89+
klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx)
8790
default:
8891
klog.Errorf("Unknown Request type %+v", v)
8992
return status.Error(codes.Unknown, "unknown request type")
@@ -119,6 +122,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
119122

120123
// RequestContext stores context information during the life time of an HTTP request.
121124
type RequestContext struct {
122-
TargetPod *backend.Pod
125+
TargetPod backend.Pod
123126
Model string
127+
Response Response
124128
}

pkg/ext-proc/scheduling/filter.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,9 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac
157157
type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool
158158

159159
// We consider serving an adapter low cost it the adapter is active in the model server, or the
160-
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the
161-
// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod.
162-
// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters.
160+
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the
161+
// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod.
162+
// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters.
163163
func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
164164
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
165165
return ok || len(pod.ActiveModels) < pod.MaxActiveModels

pkg/ext-proc/scheduling/scheduler.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,13 @@ type PodMetricsProvider interface {
110110
}
111111

112112
// Schedule finds the target pod based on metrics and the requested lora adapter.
113-
func (s *Scheduler) Schedule(req *LLMRequest) (targetPod *backend.Pod, err error) {
113+
func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) {
114114
klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics())
115115
pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics())
116116
if err != nil || len(pods) == 0 {
117-
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
117+
return backend.Pod{}, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
118118
}
119119
klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods)
120120
i := rand.Intn(len(pods))
121-
return &pods[i].Pod, nil
121+
return pods[i].Pod, nil
122122
}

pkg/manifests/ext_proc.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ spec:
9393
processingMode:
9494
request:
9595
body: Buffered
96-
response: {}
96+
response:
97+
body: Buffered
9798
# The timeouts are likely not needed here. We can experiment with removing/tuning them slowly.
9899
# The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly.
99100
messageTimeout: 1000s

0 commit comments

Comments
 (0)