Skip to content

Commit 6090a1a

Browse files
committed
Add streaming integration tests
1 parent ac5e7bb commit 6090a1a

File tree

5 files changed

+1077
-112
lines changed

5 files changed

+1077
-112
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ test: manifests generate fmt vet envtest ## Run tests.
123123

124124
.PHONY: test-integration
125125
test-integration: manifests generate fmt vet envtest ## Run tests.
126-
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./test/integration -coverprofile cover.out
126+
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./test/integration/epp/...
127127

128128
.PHONY: test-e2e
129129
test-e2e: ## Run end-to-end tests against an existing Kubernetes cluster with at least 3 available GPUs.

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module sigs.k8s.io/gateway-api-inference-extension
22

3-
go 1.23.0
3+
go 1.24.0
44

5-
toolchain go1.23.2
5+
toolchain go1.24.2
66

77
require (
88
github.com/bojand/ghz v0.120.0

pkg/epp/handlers/streamingserver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,13 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
133133
loggerVerbose.Info("got response headers", "headers", v.ResponseHeaders.Headers.GetHeaders())
134134
for _, header := range v.ResponseHeaders.Headers.GetHeaders() {
135135
value := string(header.RawValue)
136-
136+
logger.Error(nil, "header", "key", header.Key, "value", value)
137137
if header.Key == "status" && value != "200" {
138138
reqCtx.ResponseStatusCode = errutil.ModelServerError
139139
} else if header.Key == "content-type" && strings.Contains(value, "text/event-stream") {
140140
reqCtx.modelServerStreaming = true
141141
loggerVerbose.Info("model server is streaming response")
142+
logger.Error(nil, "made it here")
142143
}
143144
}
144145
reqCtx.RequestState = ResponseRecieved

pkg/epp/util/testing/request.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package testing
1919
import (
2020
"encoding/json"
2121

22+
envoyCorev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2223
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
2324
"github.com/go-logr/logr"
2425
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -38,8 +39,29 @@ func GenerateRequest(logger logr.Logger, prompt, model string) *extProcPb.Proces
3839
}
3940
req := &extProcPb.ProcessingRequest{
4041
Request: &extProcPb.ProcessingRequest_RequestBody{
41-
RequestBody: &extProcPb.HttpBody{Body: llmReq},
42+
RequestBody: &extProcPb.HttpBody{Body: llmReq, EndOfStream: true},
4243
},
4344
}
4445
return req
4546
}
47+
48+
func GenerateStreamedRequestSet(logger logr.Logger, prompt, model string) []*extProcPb.ProcessingRequest {
49+
requests := []*extProcPb.ProcessingRequest{}
50+
headerReq := &extProcPb.ProcessingRequest{
51+
Request: &extProcPb.ProcessingRequest_RequestHeaders{
52+
RequestHeaders: &extProcPb.HttpHeaders{
53+
Headers: &envoyCorev3.HeaderMap{
54+
Headers: []*envoyCorev3.HeaderValue{
55+
{
56+
Key: "hi",
57+
Value: "mom",
58+
},
59+
},
60+
},
61+
},
62+
},
63+
}
64+
requests = append(requests, headerReq)
65+
requests = append(requests, GenerateRequest(logger, prompt, model))
66+
return requests
67+
}

0 commit comments

Comments
 (0)