Skip to content

Commit 836820f

Browse files
committed
add initial integration test for body-based routing extension
1 parent b40de04 commit 836820f

File tree

2 files changed

+178
-12
lines changed

2 files changed

+178
-12
lines changed

pkg/body-based-routing/server/runserver.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import (
3232

3333
// ExtProcServerRunner provides methods to manage an external process server.
3434
type ExtProcServerRunner struct {
35-
GrpcPort int
35+
GrpcPort int
36+
SecureServing bool
3637
}
3738

3839
// Default values for CLI flags in main
@@ -42,26 +43,29 @@ const (
4243

4344
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
4445
return &ExtProcServerRunner{
45-
GrpcPort: DefaultGrpcPort,
46+
GrpcPort: DefaultGrpcPort,
47+
SecureServing: true,
4648
}
4749
}
4850

4951
// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server.
5052
// The runnable implements LeaderElectionRunnable with leader election disabled.
5153
func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
5254
return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error {
53-
cert, err := tlsutil.CreateSelfSignedTLSCertificate(logger)
54-
if err != nil {
55-
logger.Error(err, "Failed to create self signed certificate")
56-
return err
55+
var srv *grpc.Server
56+
if r.SecureServing {
57+
cert, err := tlsutil.CreateSelfSignedTLSCertificate(logger)
58+
if err != nil {
59+
logger.Error(err, "Failed to create self signed certificate")
60+
return err
61+
}
62+
creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}})
63+
srv = grpc.NewServer(grpc.Creds(creds))
64+
} else {
65+
srv = grpc.NewServer()
5766
}
58-
creds := credentials.NewTLS(&tls.Config{Certificates: []tls.Certificate{cert}})
5967

60-
srv := grpc.NewServer(grpc.Creds(creds))
61-
extProcPb.RegisterExternalProcessorServer(
62-
srv,
63-
handlers.NewServer(),
64-
)
68+
extProcPb.RegisterExternalProcessorServer(srv, handlers.NewServer())
6569

6670
// Forward to the gRPC runnable.
6771
return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx)

test/integration/bbr/hermetic_test.go

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package bbr contains integration tests for the body-based routing extension.
18+
package bbr
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"testing"
25+
"time"
26+
27+
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
28+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
29+
"github.com/go-logr/logr"
30+
"github.com/google/go-cmp/cmp"
31+
"google.golang.org/grpc"
32+
"google.golang.org/grpc/credentials/insecure"
33+
"google.golang.org/protobuf/testing/protocmp"
34+
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/server"
35+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
36+
)
37+
38+
const port = runserver.DefaultGrpcPort
39+
40+
var logger = logutil.NewTestLogger().V(logutil.VERBOSE)
41+
42+
func TestBodyBasedRouting(t *testing.T) {
43+
tests := []struct {
44+
name string
45+
req *extProcPb.ProcessingRequest
46+
wantHeaders []*configPb.HeaderValueOption
47+
wantErr bool
48+
}{
49+
{
50+
name: "success adding model parameter to header",
51+
req: generateRequest(logger, "llama"),
52+
wantHeaders: []*configPb.HeaderValueOption{
53+
{
54+
Header: &configPb.HeaderValue{
55+
Key: "X-Gateway-Model-Name",
56+
RawValue: []byte("llama"),
57+
},
58+
},
59+
},
60+
wantErr: false,
61+
},
62+
}
63+
64+
for _, test := range tests {
65+
t.Run(test.name, func(t *testing.T) {
66+
client, cleanup := setUpHermeticServer()
67+
t.Cleanup(cleanup)
68+
69+
want := &extProcPb.ProcessingResponse{
70+
Response: &extProcPb.ProcessingResponse_RequestBody{
71+
RequestBody: &extProcPb.BodyResponse{
72+
Response: &extProcPb.CommonResponse{
73+
HeaderMutation: &extProcPb.HeaderMutation{
74+
SetHeaders: test.wantHeaders,
75+
},
76+
ClearRouteCache: true,
77+
},
78+
},
79+
},
80+
}
81+
82+
res, err := sendRequest(t, client, test.req)
83+
if err != nil && !test.wantErr {
84+
t.Errorf("Unexpected error, got: %v, want error: %v", err, test.wantErr)
85+
}
86+
if diff := cmp.Diff(want, res, protocmp.Transform()); diff != "" {
87+
t.Errorf("Unexpected response, (-want +got): %v", diff)
88+
}
89+
})
90+
}
91+
}
92+
93+
func setUpHermeticServer() (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
94+
serverCtx, stopServer := context.WithCancel(context.Background())
95+
serverRunner := runserver.NewDefaultExtProcServerRunner()
96+
serverRunner.SecureServing = false
97+
98+
go func() {
99+
if err := serverRunner.AsRunnable(logger.WithName("ext-proc")).Start(serverCtx); err != nil {
100+
logutil.Fatal(logger, err, "Failed to start ext-proc server")
101+
}
102+
}()
103+
104+
address := fmt.Sprintf("localhost:%v", port)
105+
// Create a grpc connection
106+
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
107+
if err != nil {
108+
logutil.Fatal(logger, err, "Failed to connect", "address", address)
109+
}
110+
111+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
112+
client, err = extProcPb.NewExternalProcessorClient(conn).Process(ctx)
113+
if err != nil {
114+
logutil.Fatal(logger, err, "Failed to create client")
115+
}
116+
return client, func() {
117+
cancel()
118+
conn.Close()
119+
stopServer()
120+
121+
// wait a little until the goroutines actually exit
122+
time.Sleep(5 * time.Second)
123+
}
124+
}
125+
126+
func generateRequest(logger logr.Logger, model string) *extProcPb.ProcessingRequest {
127+
j := map[string]interface{}{
128+
"prompt": "test1",
129+
"max_tokens": 100,
130+
"temperature": 0,
131+
}
132+
if model != "" {
133+
j["model"] = model
134+
}
135+
136+
llmReq, err := json.Marshal(j)
137+
if err != nil {
138+
logutil.Fatal(logger, err, "Failed to unmarshal LLM request")
139+
}
140+
req := &extProcPb.ProcessingRequest{
141+
Request: &extProcPb.ProcessingRequest_RequestBody{
142+
RequestBody: &extProcPb.HttpBody{Body: llmReq},
143+
},
144+
}
145+
return req
146+
}
147+
148+
func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
149+
t.Logf("Sending request: %v", req)
150+
if err := client.Send(req); err != nil {
151+
t.Logf("Failed to send request %+v: %v", req, err)
152+
return nil, err
153+
}
154+
155+
res, err := client.Recv()
156+
if err != nil {
157+
t.Logf("Failed to receive: %v", err)
158+
return nil, err
159+
}
160+
t.Logf("Received request %+v", res)
161+
return res, err
162+
}

0 commit comments

Comments
 (0)