Skip to content

Commit a5bf0ac

Browse files
authored
wiring up chunked response logic (#860)
1 parent 8770afe commit a5bf0ac

File tree

4 files changed

+74
-83
lines changed

4 files changed

+74
-83
lines changed

pkg/epp/handlers/request.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,20 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
6060
return nil
6161
}
6262

63-
func (s *StreamingServer) generateRequestBodyResponse(requestBodyBytes []byte) *extProcPb.ProcessingResponse {
64-
return &extProcPb.ProcessingResponse{
65-
Response: &extProcPb.ProcessingResponse_RequestBody{
66-
RequestBody: &extProcPb.BodyResponse{
67-
Response: &extProcPb.CommonResponse{
68-
BodyMutation: &extProcPb.BodyMutation{
69-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
70-
StreamedResponse: &extProcPb.StreamedBodyResponse{
71-
Body: requestBodyBytes,
72-
EndOfStream: true,
73-
},
74-
},
75-
},
63+
func (s *StreamingServer) generateRequestBodyResponses(requestBodyBytes []byte) []*extProcPb.ProcessingResponse {
64+
commonResponses := buildCommonResponses(requestBodyBytes, bodyByteLimit, true)
65+
responses := []*extProcPb.ProcessingResponse{}
66+
for _, commonResp := range commonResponses {
67+
resp := &extProcPb.ProcessingResponse{
68+
Response: &extProcPb.ProcessingResponse_RequestBody{
69+
RequestBody: &extProcPb.BodyResponse{
70+
Response: commonResp,
7671
},
7772
},
78-
},
73+
}
74+
responses = append(responses, resp)
7975
}
76+
return responses
8077
}
8178

8279
func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {

pkg/epp/handlers/response.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,25 +63,7 @@ func (s *StreamingServer) HandleResponseBody(
6363
// will add the processing for streaming case.
6464
reqCtx.ResponseComplete = true
6565

66-
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
67-
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
68-
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
69-
// options for gateway providers.
70-
Response: &extProcPb.ProcessingResponse_ResponseBody{
71-
ResponseBody: &extProcPb.BodyResponse{
72-
Response: &extProcPb.CommonResponse{
73-
BodyMutation: &extProcPb.BodyMutation{
74-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
75-
StreamedResponse: &extProcPb.StreamedBodyResponse{
76-
Body: responseBytes,
77-
EndOfStream: true,
78-
},
79-
},
80-
},
81-
},
82-
},
83-
},
84-
}
66+
reqCtx.respBodyResp = generateResponseBodyResponses(responseBytes, true)
8567
return reqCtx, nil
8668
}
8769

@@ -127,6 +109,22 @@ func (s *StreamingServer) generateResponseHeaderResponse(reqCtx *RequestContext)
127109
}
128110
}
129111

112+
func generateResponseBodyResponses(responseBodyBytes []byte, setEoS bool) []*extProcPb.ProcessingResponse {
113+
commonResponses := buildCommonResponses(responseBodyBytes, bodyByteLimit, setEoS)
114+
responses := []*extProcPb.ProcessingResponse{}
115+
for _, commonResp := range commonResponses {
116+
resp := &extProcPb.ProcessingResponse{
117+
Response: &extProcPb.ProcessingResponse_ResponseBody{
118+
ResponseBody: &extProcPb.BodyResponse{
119+
Response: commonResp,
120+
},
121+
},
122+
}
123+
responses = append(responses, resp)
124+
}
125+
return responses
126+
}
127+
130128
func (s *StreamingServer) generateResponseHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
131129
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
132130
headers := []*configPb.HeaderValueOption{

pkg/epp/handlers/server.go

Lines changed: 40 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,11 @@ type RequestContext struct {
9999
Response *Response
100100

101101
reqHeaderResp *extProcPb.ProcessingResponse
102-
reqBodyResp *extProcPb.ProcessingResponse
102+
reqBodyResp []*extProcPb.ProcessingResponse
103103
reqTrailerResp *extProcPb.ProcessingResponse
104104

105105
respHeaderResp *extProcPb.ProcessingResponse
106-
respBodyResp *extProcPb.ProcessingResponse
106+
respBodyResp []*extProcPb.ProcessingResponse
107107
respTrailerResp *extProcPb.ProcessingResponse
108108
}
109109

@@ -222,7 +222,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
222222
}
223223
reqCtx.RequestSize = len(requestBodyBytes)
224224
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
225-
reqCtx.reqBodyResp = s.generateRequestBodyResponse(requestBodyBytes)
225+
reqCtx.reqBodyResp = s.generateRequestBodyResponses(requestBodyBytes)
226226

227227
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
228228
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
@@ -264,22 +264,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
264264
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
265265
}
266266

267-
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
268-
Response: &extProcPb.ProcessingResponse_ResponseBody{
269-
ResponseBody: &extProcPb.BodyResponse{
270-
Response: &extProcPb.CommonResponse{
271-
BodyMutation: &extProcPb.BodyMutation{
272-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
273-
StreamedResponse: &extProcPb.StreamedBodyResponse{
274-
Body: v.ResponseBody.Body,
275-
EndOfStream: v.ResponseBody.EndOfStream,
276-
},
277-
},
278-
},
279-
},
280-
},
281-
},
282-
}
267+
reqCtx.respBodyResp = generateResponseBodyResponses(v.ResponseBody.Body, v.ResponseBody.EndOfStream)
283268
} else {
284269
body = append(body, v.ResponseBody.Body...)
285270

@@ -293,22 +278,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
293278
responseErr = json.Unmarshal(body, &responseBody)
294279
if responseErr != nil {
295280
logger.V(logutil.DEFAULT).Error(responseErr, "Error unmarshaling request body", "body", string(body))
296-
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
297-
Response: &extProcPb.ProcessingResponse_ResponseBody{
298-
ResponseBody: &extProcPb.BodyResponse{
299-
Response: &extProcPb.CommonResponse{
300-
BodyMutation: &extProcPb.BodyMutation{
301-
Mutation: &extProcPb.BodyMutation_StreamedResponse{
302-
StreamedResponse: &extProcPb.StreamedBodyResponse{
303-
Body: body,
304-
EndOfStream: true,
305-
},
306-
},
307-
},
308-
},
309-
},
310-
},
311-
}
281+
reqCtx.respBodyResp = generateResponseBodyResponses(body, true)
312282
break
313283
}
314284

@@ -361,10 +331,13 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
361331
}
362332
r.RequestState = HeaderRequestResponseComplete
363333
}
364-
if r.RequestState == HeaderRequestResponseComplete && r.reqBodyResp != nil {
365-
loggerTrace.Info("Sending request body response")
366-
if err := srv.Send(r.reqBodyResp); err != nil {
367-
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
334+
if r.RequestState == HeaderRequestResponseComplete && r.reqBodyResp != nil && len(r.reqBodyResp) > 0 {
335+
loggerTrace.Info("Sending request body response(s)")
336+
337+
for _, response := range r.reqBodyResp {
338+
if err := srv.Send(response); err != nil {
339+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
340+
}
368341
}
369342
r.RequestState = BodyRequestResponsesComplete
370343
metrics.IncRunningRequests(r.Model)
@@ -385,15 +358,17 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
385358
}
386359
r.RequestState = HeaderResponseResponseComplete
387360
}
388-
if r.RequestState == HeaderResponseResponseComplete && r.respBodyResp != nil {
389-
loggerTrace.Info("Sending response body response")
390-
if err := srv.Send(r.respBodyResp); err != nil {
391-
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
392-
}
361+
if r.RequestState == HeaderResponseResponseComplete && r.respBodyResp != nil && len(r.respBodyResp) > 0 {
362+
loggerTrace.Info("Sending response body response(s)")
363+
for _, response := range r.respBodyResp {
364+
if err := srv.Send(response); err != nil {
365+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
366+
}
393367

394-
body := r.respBodyResp.Response.(*extProcPb.ProcessingResponse_ResponseBody)
395-
if body.ResponseBody.Response.GetBodyMutation().GetStreamedResponse().GetEndOfStream() {
396-
r.RequestState = BodyResponseResponsesComplete
368+
body := response.Response.(*extProcPb.ProcessingResponse_ResponseBody)
369+
if body.ResponseBody.Response.GetBodyMutation().GetStreamedResponse().GetEndOfStream() {
370+
r.RequestState = BodyResponseResponsesComplete
371+
}
397372
}
398373
// Dump the response so a new stream message can begin
399374
r.respBodyResp = nil
@@ -466,16 +441,31 @@ func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
466441
return resp, nil
467442
}
468443

469-
func buildCommonResponses(bodyBytes []byte, byteLimit int) []*extProcPb.CommonResponse {
444+
func buildCommonResponses(bodyBytes []byte, byteLimit int, setEos bool) []*extProcPb.CommonResponse {
470445
responses := []*extProcPb.CommonResponse{}
471446
startingIndex := 0
472447
bodyLen := len(bodyBytes)
473448

449+
if bodyLen == 0 {
450+
return []*extProcPb.CommonResponse{
451+
{
452+
BodyMutation: &extProcPb.BodyMutation{
453+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
454+
StreamedResponse: &extProcPb.StreamedBodyResponse{
455+
Body: bodyBytes,
456+
EndOfStream: setEos,
457+
},
458+
},
459+
},
460+
},
461+
}
462+
}
463+
474464
for startingIndex < bodyLen {
475465
eos := false
476466
len := min(bodyLen-startingIndex, byteLimit)
477467
chunk := bodyBytes[startingIndex : len+startingIndex]
478-
if len+startingIndex == bodyLen {
468+
if setEos && len+startingIndex >= bodyLen {
479469
eos = true
480470
}
481471

@@ -492,5 +482,6 @@ func buildCommonResponses(bodyBytes []byte, byteLimit int) []*extProcPb.CommonRe
492482
responses = append(responses, commonResp)
493483
startingIndex += len
494484
}
485+
495486
return responses
496487
}

pkg/epp/handlers/server_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ func TestBuildCommonResponses(t *testing.T) {
1111
count int
1212
expectedMessageCount int
1313
}{
14+
{
15+
name: "zero case",
16+
count: 0,
17+
expectedMessageCount: 1,
18+
},
1419
{
1520
name: "below limit",
1621
count: bodyByteLimit - 1000,
@@ -40,7 +45,7 @@ func TestBuildCommonResponses(t *testing.T) {
4045
for _, test := range tests {
4146
t.Run(test.name, func(t *testing.T) {
4247
arr := generateBytes(test.count)
43-
responses := buildCommonResponses(arr, bodyByteLimit)
48+
responses := buildCommonResponses(arr, bodyByteLimit, true)
4449
for i, response := range responses {
4550
eos := response.BodyMutation.GetStreamedResponse().GetEndOfStream()
4651
if eos == true && i+1 != len(responses) {

0 commit comments

Comments
 (0)