1
1
// Package test contains e2e tests for the ext proc while faking the backend pods.
2
- package test
2
+ package integration
3
3
4
4
import (
5
5
"bufio"
@@ -24,11 +24,11 @@ import (
24
24
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
25
25
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
26
26
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
27
+ extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test"
27
28
"k8s.io/apimachinery/pkg/runtime"
28
29
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29
30
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
30
31
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
31
- "k8s.io/client-go/rest"
32
32
klog "k8s.io/klog/v2"
33
33
k8sclient "sigs.k8s.io/controller-runtime/pkg/client"
34
34
"sigs.k8s.io/controller-runtime/pkg/envtest"
@@ -40,6 +40,7 @@ const (
40
40
)
41
41
42
42
var (
43
+ runner * runserver.ExtProcServerRunner
43
44
k8sClient k8sclient.Client
44
45
testEnv * envtest.Environment
45
46
scheme = runtime .NewScheme ()
@@ -57,7 +58,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
57
58
}{
58
59
{
59
60
name : "success" ,
60
- req : GenerateRequest ("my-model" ),
61
+ req : extprocutils . GenerateRequest ("my-model" ),
61
62
models : map [string ]* v1alpha1.InferenceModel {
62
63
"my-model" : {
63
64
Spec : v1alpha1.InferenceModelSpec {
@@ -75,7 +76,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
75
76
// model being active, and has low KV cache.
76
77
pods : []* backend.PodMetrics {
77
78
{
78
- Pod : FakePod (0 ),
79
+ Pod : extprocutils . FakePod (0 ),
79
80
Metrics : backend.Metrics {
80
81
WaitingQueueSize : 0 ,
81
82
KVCacheUsagePercent : 0.2 ,
@@ -86,7 +87,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
86
87
},
87
88
},
88
89
{
89
- Pod : FakePod (1 ),
90
+ Pod : extprocutils . FakePod (1 ),
90
91
Metrics : backend.Metrics {
91
92
WaitingQueueSize : 0 ,
92
93
KVCacheUsagePercent : 0.1 ,
@@ -97,7 +98,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
97
98
},
98
99
},
99
100
{
100
- Pod : FakePod (2 ),
101
+ Pod : extprocutils . FakePod (2 ),
101
102
Metrics : backend.Metrics {
102
103
WaitingQueueSize : 10 ,
103
104
KVCacheUsagePercent : 0.2 ,
@@ -172,7 +173,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
172
173
}{
173
174
{
174
175
name : "success" ,
175
- req : GenerateRequest ("sql-lora" ),
176
+ req : extprocutils . GenerateRequest ("sql-lora" ),
176
177
// pod-1 will be picked because it has relatively low queue size, with the requested
177
178
// model being active, and has low KV cache.
178
179
wantHeaders : []* configPb.HeaderValueOption {
@@ -194,29 +195,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
194
195
},
195
196
}
196
197
197
- // Set up mock k8s API Client
198
- testEnv = & envtest.Environment {
199
- CRDDirectoryPaths : []string {filepath .Join (".." , ".." , ".." , "config" , "crd" , "bases" )},
200
- ErrorIfCRDPathMissing : true ,
201
- }
202
- cfg , err := testEnv .Start ()
203
- if err != nil {
204
- log .Fatalf ("Failed to start test environment, cfg: %v error: %v" , cfg , err )
205
- }
206
-
207
- utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
208
- utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
209
-
210
- k8sClient , err = k8sclient .New (cfg , k8sclient.Options {Scheme : scheme })
211
- if err != nil {
212
- log .Fatalf ("Failed to start k8s Client: %v" , err )
213
- } else if k8sClient == nil {
214
- log .Fatalf ("No error, but returned kubernetes client is nil, cfg: %v" , cfg )
215
- }
216
-
217
198
pods := []* backend.PodMetrics {
218
199
{
219
- Pod : FakePod (0 ),
200
+ Pod : extprocutils . FakePod (0 ),
220
201
Metrics : backend.Metrics {
221
202
WaitingQueueSize : 0 ,
222
203
KVCacheUsagePercent : 0.2 ,
@@ -227,7 +208,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
227
208
},
228
209
},
229
210
{
230
- Pod : FakePod (1 ),
211
+ Pod : extprocutils . FakePod (1 ),
231
212
Metrics : backend.Metrics {
232
213
WaitingQueueSize : 0 ,
233
214
KVCacheUsagePercent : 0.1 ,
@@ -238,7 +219,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
238
219
},
239
220
},
240
221
{
241
- Pod : FakePod (2 ),
222
+ Pod : extprocutils . FakePod (2 ),
242
223
Metrics : backend.Metrics {
243
224
WaitingQueueSize : 10 ,
244
225
KVCacheUsagePercent : 0.2 ,
@@ -248,9 +229,13 @@ func TestKubeInferenceModelRequest(t *testing.T) {
248
229
},
249
230
},
250
231
}
232
+
233
+ // Set up global k8sclient and extproc server runner with test environment config
234
+ BeforeSuit ()
235
+
251
236
for _ , test := range tests {
252
237
t .Run (test .name , func (t * testing.T ) {
253
- client , cleanup := setUpHermeticServer (t , cfg , pods )
238
+ client , cleanup := setUpHermeticServer (t , pods )
254
239
t .Cleanup (cleanup )
255
240
want := & extProcPb.ProcessingResponse {
256
241
Response : & extProcPb.ProcessingResponse_RequestBody {
@@ -283,7 +268,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
283
268
284
269
func setUpServer (t * testing.T , pods []* backend.PodMetrics , models map [string ]* v1alpha1.InferenceModel ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
285
270
t .Logf ("Setting up ExtProc server" )
286
- server := StartExtProc (port , time .Second , time .Second , pods , models )
271
+ server := extprocutils . StartExtProc (port , time .Second , time .Second , pods , models )
287
272
288
273
address := fmt .Sprintf ("localhost:%v" , port )
289
274
// Create a grpc connection
@@ -304,33 +289,13 @@ func setUpServer(t *testing.T, pods []*backend.PodMetrics, models map[string]*v1
304
289
}
305
290
}
306
291
307
- func setUpHermeticServer (t * testing.T , cfg * rest. Config , pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
292
+ func setUpHermeticServer (t * testing.T , pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
308
293
t .Logf ("Setting up hermetic ExtProc server" )
309
294
klog .InitFlags (nil )
310
295
flag .Parse ()
311
296
// Configure klog verbosity levels to print ext proc logs.
312
297
_ = flag .Lookup ("v" ).Value .Set ("3" )
313
298
314
- runner := & runserver.ExtProcServerRunner {
315
- GrpcPort : port ,
316
- TargetPodHeader : "target-pod" ,
317
- PoolName : "vllm-llama2-7b-pool" ,
318
- PoolNamespace : "default" ,
319
- ServiceName : "" ,
320
- Zone : "" ,
321
- RefreshPodsInterval : 10 * time .Second ,
322
- RefreshMetricsInterval : 50 * time .Millisecond ,
323
- Scheme : scheme ,
324
- Config : cfg ,
325
- Datastore : backend .NewK8sDataStore (),
326
- }
327
- runner .Setup ()
328
-
329
- // Start the controller manager in go routine, not blocking
330
- go func () {
331
- runner .StartManager ()
332
- }()
333
-
334
299
// Unmarshal CRDs from file into structs
335
300
manifestsPath := filepath .Join ("." , "artifacts" , "inferencepool-with-model-hermetic.yaml" )
336
301
docs , err := readDocuments (manifestsPath )
@@ -392,6 +357,51 @@ func setUpHermeticServer(t *testing.T, cfg *rest.Config, pods []*backend.PodMetr
392
357
}
393
358
}
394
359
360
+ // Sets up a test environment and returns the runner struct
361
+ func BeforeSuit () {
362
+ // Set up mock k8s API Client
363
+ testEnv = & envtest.Environment {
364
+ CRDDirectoryPaths : []string {filepath .Join (".." , ".." , "config" , "crd" , "bases" )},
365
+ ErrorIfCRDPathMissing : true ,
366
+ }
367
+ cfg , err := testEnv .Start ()
368
+
369
+ if err != nil {
370
+ log .Fatalf ("Failed to start test environment, cfg: %v error: %v" , cfg , err )
371
+ }
372
+
373
+ utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
374
+ utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
375
+
376
+ k8sClient , err = k8sclient .New (cfg , k8sclient.Options {Scheme : scheme })
377
+ if err != nil {
378
+ log .Fatalf ("Failed to start k8s Client: %v" , err )
379
+ } else if k8sClient == nil {
380
+ log .Fatalf ("No error, but returned kubernetes client is nil, cfg: %v" , cfg )
381
+ }
382
+
383
+ runner = & runserver.ExtProcServerRunner {
384
+ GrpcPort : port ,
385
+ TargetPodHeader : "target-pod" ,
386
+ PoolName : "vllm-llama2-7b-pool" ,
387
+ PoolNamespace : "default" ,
388
+ ServiceName : "" ,
389
+ Zone : "" ,
390
+ RefreshPodsInterval : 10 * time .Second ,
391
+ RefreshMetricsInterval : 50 * time .Millisecond ,
392
+ Scheme : scheme ,
393
+ Config : cfg ,
394
+ Datastore : backend .NewK8sDataStore (),
395
+ }
396
+
397
+ runner .Setup ()
398
+
399
+ // Start the controller manager in go routine, not blocking
400
+ go func () {
401
+ runner .StartManager ()
402
+ }()
403
+ }
404
+
395
405
func sendRequest (t * testing.T , client extProcPb.ExternalProcessor_ProcessClient , req * extProcPb.ProcessingRequest ) (* extProcPb.ProcessingResponse , error ) {
396
406
t .Logf ("Sending request: %v" , req )
397
407
if err := client .Send (req ); err != nil {
0 commit comments