Skip to content

Commit 70285f1

Browse files
authored
Adding util func for splitting large bodies into chunks (#859)
1 parent 87b3a08 commit 70285f1

File tree

2 files changed

+98
-0
lines changed

2 files changed

+98
-0
lines changed

pkg/epp/handlers/server.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ import (
3737
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
3838
)
3939

40+
const (
41+
// Certain envoy implementations set a max limit of 64Kb per streamed chunk, intentionally setting this lower for a safe margin.
42+
bodyByteLimit = 62000
43+
)
44+
4045
func NewStreamingServer(destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore Datastore, director Director) *StreamingServer {
4146
return &StreamingServer{
4247
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
@@ -460,3 +465,32 @@ func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
460465

461466
return resp, nil
462467
}
468+
469+
func buildCommonResponses(bodyBytes []byte, byteLimit int) []*extProcPb.CommonResponse {
470+
responses := []*extProcPb.CommonResponse{}
471+
startingIndex := 0
472+
bodyLen := len(bodyBytes)
473+
474+
for startingIndex < bodyLen {
475+
eos := false
476+
len := min(bodyLen-startingIndex, byteLimit)
477+
chunk := bodyBytes[startingIndex : len+startingIndex]
478+
if len+startingIndex == bodyLen {
479+
eos = true
480+
}
481+
482+
commonResp := &extProcPb.CommonResponse{
483+
BodyMutation: &extProcPb.BodyMutation{
484+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
485+
StreamedResponse: &extProcPb.StreamedBodyResponse{
486+
Body: chunk,
487+
EndOfStream: eos,
488+
},
489+
},
490+
},
491+
}
492+
responses = append(responses, commonResp)
493+
startingIndex += len
494+
}
495+
return responses
496+
}

pkg/epp/handlers/server_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package handlers
2+
3+
import (
4+
"crypto/rand"
5+
"testing"
6+
)
7+
8+
func TestBuildCommonResponses(t *testing.T) {
9+
tests := []struct {
10+
name string
11+
count int
12+
expectedMessageCount int
13+
}{
14+
{
15+
name: "below limit",
16+
count: bodyByteLimit - 1000,
17+
expectedMessageCount: 1,
18+
},
19+
{
20+
name: "at limit",
21+
count: bodyByteLimit,
22+
expectedMessageCount: 1,
23+
},
24+
{
25+
name: "off by one error?",
26+
count: bodyByteLimit + 1,
27+
expectedMessageCount: 2,
28+
},
29+
{
30+
name: "above limit",
31+
count: bodyByteLimit + 1000,
32+
expectedMessageCount: 2,
33+
},
34+
{
35+
name: "above limit",
36+
count: (bodyByteLimit * 2) + 1000,
37+
expectedMessageCount: 3,
38+
},
39+
}
40+
for _, test := range tests {
41+
t.Run(test.name, func(t *testing.T) {
42+
arr := generateBytes(test.count)
43+
responses := buildCommonResponses(arr, bodyByteLimit)
44+
for i, response := range responses {
45+
eos := response.BodyMutation.GetStreamedResponse().GetEndOfStream()
46+
if eos == true && i+1 != len(responses) {
47+
t.Fatalf("EoS should not be set")
48+
}
49+
if eos == false && i+1 == len(responses) {
50+
t.Fatalf("EoS should be set")
51+
}
52+
}
53+
if len(responses) != test.expectedMessageCount {
54+
t.Fatalf("Expected: %v, Got %v", test.expectedMessageCount, len(responses))
55+
}
56+
})
57+
}
58+
}
59+
60+
func generateBytes(count int) []byte {
61+
arr := make([]byte, count)
62+
_, _ = rand.Read(arr)
63+
return arr
64+
}

0 commit comments

Comments
 (0)