Skip to content

Commit f6d2a67

Browse files
rramkumar1kfswain
authored andcommitted
Add streaming integration tests for BBR (kubernetes-sigs#627)
1 parent e968b0b commit f6d2a67

File tree

3 files changed

+174
-52
lines changed

3 files changed

+174
-52
lines changed

pkg/body-based-routing/handlers/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,16 +114,16 @@ func (s *Server) processRequestBody(ctx context.Context, body *extProcPb.HttpBod
114114

115115
var requestBody map[string]interface{}
116116
if s.streaming {
117+
streamedBody.body = append(streamedBody.body, body.Body...)
117118
// In the stream case, we can receive multiple request bodies.
118-
if !body.EndOfStream {
119-
streamedBody.body = append(streamedBody.body, body.Body...)
120-
return nil, nil
121-
} else {
119+
if body.EndOfStream {
122120
loggerVerbose.Info("Flushing stream buffer")
123121
err := json.Unmarshal(streamedBody.body, &requestBody)
124122
if err != nil {
125123
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
126124
}
125+
} else {
126+
return nil, nil
127127
}
128128
} else {
129129
if err := json.Unmarshal(body.GetBody(), &requestBody); err != nil {

test/integration/bbr/hermetic_test.go

Lines changed: 165 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,19 @@ package bbr
1919

2020
import (
2121
"context"
22-
"encoding/json"
2322
"fmt"
2423
"testing"
2524
"time"
2625

2726
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2827
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
29-
"github.com/go-logr/logr"
3028
"github.com/google/go-cmp/cmp"
3129
"google.golang.org/grpc"
3230
"google.golang.org/grpc/credentials/insecure"
3331
"google.golang.org/protobuf/testing/protocmp"
3432
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/server"
3533
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
34+
integrationutils "sigs.k8s.io/gateway-api-inference-extension/test/integration"
3635
)
3736

3837
var logger = logutil.NewTestLogger().V(logutil.VERBOSE)
@@ -46,7 +45,7 @@ func TestBodyBasedRouting(t *testing.T) {
4645
}{
4746
{
4847
name: "success adding model parameter to header",
49-
req: generateRequest(logger, "llama"),
48+
req: integrationutils.GenerateRequest(logger, "test", "llama"),
5049
wantHeaders: []*configPb.HeaderValueOption{
5150
{
5251
Header: &configPb.HeaderValue{
@@ -59,15 +58,15 @@ func TestBodyBasedRouting(t *testing.T) {
5958
},
6059
{
6160
name: "no model parameter",
62-
req: generateRequest(logger, ""),
61+
req: integrationutils.GenerateRequest(logger, "test1", ""),
6362
wantHeaders: []*configPb.HeaderValueOption{},
6463
wantErr: false,
6564
},
6665
}
6766

6867
for _, test := range tests {
6968
t.Run(test.name, func(t *testing.T) {
70-
client, cleanup := setUpHermeticServer()
69+
client, cleanup := setUpHermeticServer(false)
7170
t.Cleanup(cleanup)
7271

7372
want := &extProcPb.ProcessingResponse{}
@@ -88,7 +87,7 @@ func TestBodyBasedRouting(t *testing.T) {
8887
}
8988
}
9089

91-
res, err := sendRequest(t, client, test.req)
90+
res, err := integrationutils.SendRequest(t, client, test.req)
9291
if err != nil && !test.wantErr {
9392
t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr)
9493
}
@@ -99,12 +98,171 @@ func TestBodyBasedRouting(t *testing.T) {
9998
}
10099
}
101100

102-
func setUpHermeticServer() (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
101+
func TestFullDuplexStreamed_BodyBasedRouting(t *testing.T) {
102+
tests := []struct {
103+
name string
104+
reqs []*extProcPb.ProcessingRequest
105+
wantResponses []*extProcPb.ProcessingResponse
106+
wantErr bool
107+
}{
108+
{
109+
name: "success adding model parameter to header",
110+
reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", "foo"),
111+
wantResponses: []*extProcPb.ProcessingResponse{
112+
{
113+
Response: &extProcPb.ProcessingResponse_RequestHeaders{
114+
RequestHeaders: &extProcPb.HeadersResponse{
115+
Response: &extProcPb.CommonResponse{
116+
ClearRouteCache: true,
117+
HeaderMutation: &extProcPb.HeaderMutation{
118+
SetHeaders: []*configPb.HeaderValueOption{
119+
{
120+
Header: &configPb.HeaderValue{
121+
Key: "X-Gateway-Model-Name",
122+
RawValue: []byte("foo"),
123+
},
124+
},
125+
}},
126+
},
127+
},
128+
},
129+
},
130+
{
131+
Response: &extProcPb.ProcessingResponse_RequestBody{
132+
RequestBody: &extProcPb.BodyResponse{
133+
Response: &extProcPb.CommonResponse{
134+
BodyMutation: &extProcPb.BodyMutation{
135+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
136+
StreamedResponse: &extProcPb.StreamedBodyResponse{
137+
Body: []byte("{\"max_tokens\":100,\"model\":\"foo\",\"prompt\":\"test\",\"temperature\":0}"),
138+
EndOfStream: true,
139+
},
140+
},
141+
},
142+
},
143+
},
144+
},
145+
},
146+
},
147+
},
148+
{
149+
name: "success adding model parameter to header with multiple body chunks",
150+
reqs: []*extProcPb.ProcessingRequest{
151+
{
152+
Request: &extProcPb.ProcessingRequest_RequestHeaders{
153+
RequestHeaders: &extProcPb.HttpHeaders{
154+
Headers: &configPb.HeaderMap{
155+
Headers: []*configPb.HeaderValue{
156+
{
157+
Key: "hi",
158+
Value: "mom",
159+
},
160+
},
161+
},
162+
},
163+
},
164+
},
165+
{
166+
Request: &extProcPb.ProcessingRequest_RequestBody{
167+
RequestBody: &extProcPb.HttpBody{Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lo"), EndOfStream: false},
168+
},
169+
},
170+
{
171+
Request: &extProcPb.ProcessingRequest_RequestBody{
172+
RequestBody: &extProcPb.HttpBody{Body: []byte("ra-sheddable\",\"prompt\":\"test\",\"temperature\":0}"), EndOfStream: true},
173+
},
174+
},
175+
},
176+
wantResponses: []*extProcPb.ProcessingResponse{
177+
{
178+
Response: &extProcPb.ProcessingResponse_RequestHeaders{
179+
RequestHeaders: &extProcPb.HeadersResponse{
180+
Response: &extProcPb.CommonResponse{
181+
ClearRouteCache: true,
182+
HeaderMutation: &extProcPb.HeaderMutation{
183+
SetHeaders: []*configPb.HeaderValueOption{
184+
{
185+
Header: &configPb.HeaderValue{
186+
Key: "X-Gateway-Model-Name",
187+
RawValue: []byte("sql-lora-sheddable"),
188+
},
189+
},
190+
}},
191+
},
192+
},
193+
},
194+
},
195+
{
196+
Response: &extProcPb.ProcessingResponse_RequestBody{
197+
RequestBody: &extProcPb.BodyResponse{
198+
Response: &extProcPb.CommonResponse{
199+
BodyMutation: &extProcPb.BodyMutation{
200+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
201+
StreamedResponse: &extProcPb.StreamedBodyResponse{
202+
Body: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-sheddable\",\"prompt\":\"test\",\"temperature\":0}"),
203+
EndOfStream: true,
204+
},
205+
},
206+
},
207+
},
208+
},
209+
},
210+
},
211+
},
212+
},
213+
{
214+
name: "no model parameter",
215+
reqs: integrationutils.GenerateStreamedRequestSet(logger, "test", ""),
216+
wantResponses: []*extProcPb.ProcessingResponse{
217+
{
218+
Response: &extProcPb.ProcessingResponse_RequestHeaders{
219+
RequestHeaders: &extProcPb.HeadersResponse{},
220+
},
221+
},
222+
{
223+
Response: &extProcPb.ProcessingResponse_RequestBody{
224+
RequestBody: &extProcPb.BodyResponse{
225+
Response: &extProcPb.CommonResponse{
226+
BodyMutation: &extProcPb.BodyMutation{
227+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
228+
StreamedResponse: &extProcPb.StreamedBodyResponse{
229+
Body: []byte("{\"max_tokens\":100,\"prompt\":\"test\",\"temperature\":0}"),
230+
EndOfStream: true,
231+
},
232+
},
233+
},
234+
},
235+
},
236+
},
237+
},
238+
},
239+
},
240+
}
241+
242+
for _, test := range tests {
243+
t.Run(test.name, func(t *testing.T) {
244+
client, cleanup := setUpHermeticServer(true)
245+
t.Cleanup(cleanup)
246+
247+
responses, err := integrationutils.StreamedRequest(t, client, test.reqs, len(test.wantResponses))
248+
if err != nil && !test.wantErr {
249+
t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr)
250+
}
251+
252+
if diff := cmp.Diff(test.wantResponses, responses, protocmp.Transform()); diff != "" {
253+
t.Errorf("Unexpected response, (-want +got): %v", diff)
254+
}
255+
})
256+
}
257+
}
258+
259+
func setUpHermeticServer(streaming bool) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
103260
port := 9004
104261

105262
serverCtx, stopServer := context.WithCancel(context.Background())
106263
serverRunner := runserver.NewDefaultExtProcServerRunner(port, false)
107264
serverRunner.SecureServing = false
265+
serverRunner.Streaming = streaming
108266

109267
go func() {
110268
if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil {
@@ -133,41 +291,3 @@ func setUpHermeticServer() (client extProcPb.ExternalProcessor_ProcessClient, cl
133291
time.Sleep(5 * time.Second)
134292
}
135293
}
136-
137-
func generateRequest(logger logr.Logger, model string) *extProcPb.ProcessingRequest {
138-
j := map[string]interface{}{
139-
"prompt": "test1",
140-
"max_tokens": 100,
141-
"temperature": 0,
142-
}
143-
if model != "" {
144-
j["model"] = model
145-
}
146-
147-
llmReq, err := json.Marshal(j)
148-
if err != nil {
149-
logutil.Fatal(logger, err, "Failed to unmarshal LLM request")
150-
}
151-
req := &extProcPb.ProcessingRequest{
152-
Request: &extProcPb.ProcessingRequest_RequestBody{
153-
RequestBody: &extProcPb.HttpBody{Body: llmReq},
154-
},
155-
}
156-
return req
157-
}
158-
159-
func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
160-
t.Logf("Sending request: %v", req)
161-
if err := client.Send(req); err != nil {
162-
t.Logf("Failed to send request %+v: %v", req, err)
163-
return nil, err
164-
}
165-
166-
res, err := client.Recv()
167-
if err != nil {
168-
t.Logf("Failed to receive: %v", err)
169-
return nil, err
170-
}
171-
t.Logf("Received request %+v", res)
172-
return res, err
173-
}

test/integration/util.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func SendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient,
4040
t.Logf("Failed to receive: %v", err)
4141
return nil, err
4242
}
43-
t.Logf("Received request %+v", res)
43+
t.Logf("Received response %+v", res)
4444
return res, err
4545
}
4646

@@ -71,19 +71,21 @@ func StreamedRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessCli
7171
t.Logf("Failed to receive: %v", err)
7272
return nil, err
7373
}
74-
t.Logf("Received request %+v", res)
74+
t.Logf("Received response %+v", res)
7575
responses = append(responses, res)
7676
}
7777
return responses, nil
7878
}
7979

8080
func GenerateRequest(logger logr.Logger, prompt, model string) *extProcPb.ProcessingRequest {
8181
j := map[string]interface{}{
82-
"model": model,
8382
"prompt": prompt,
8483
"max_tokens": 100,
8584
"temperature": 0,
8685
}
86+
if model != "" {
87+
j["model"] = model
88+
}
8789

8890
llmReq, err := json.Marshal(j)
8991
if err != nil {

0 commit comments

Comments
 (0)