Skip to content

Commit 87b3a08

Browse files
authored
adding logging & support for better response when requests are not valid json (#847)
1 parent 8958028 commit 87b3a08

File tree

3 files changed

+196
-7
lines changed

3 files changed

+196
-7
lines changed

pkg/epp/handlers/server.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
196196
err = json.Unmarshal(body, &reqCtx.Request.Body)
197197
if err != nil {
198198
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
199-
// TODO: short circuit and send the body back as is (this could be an envoy error), currently we drop
200-
// whatever the body request would have been and send our immediate response instead.
199+
err = errutil.Error{Code: errutil.BadRequest, Msg: "Error unmarshaling request body: " + string(body)}
200+
break
201201
}
202202

203203
// Body stream complete. Allocate empty slice for response to use.
@@ -287,7 +287,24 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
287287
var responseErr error
288288
responseErr = json.Unmarshal(body, &responseBody)
289289
if responseErr != nil {
290-
logger.V(logutil.DEFAULT).Error(responseErr, "Error unmarshaling request body")
290+
logger.V(logutil.DEFAULT).Error(responseErr, "Error unmarshaling request body", "body", string(body))
291+
reqCtx.respBodyResp = &extProcPb.ProcessingResponse{
292+
Response: &extProcPb.ProcessingResponse_ResponseBody{
293+
ResponseBody: &extProcPb.BodyResponse{
294+
Response: &extProcPb.CommonResponse{
295+
BodyMutation: &extProcPb.BodyMutation{
296+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
297+
StreamedResponse: &extProcPb.StreamedBodyResponse{
298+
Body: body,
299+
EndOfStream: true,
300+
},
301+
},
302+
},
303+
},
304+
},
305+
},
306+
}
307+
break
291308
}
292309

293310
reqCtx, responseErr = s.HandleResponseBody(ctx, reqCtx, responseBody)
@@ -436,5 +453,10 @@ func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
436453
default:
437454
return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err)
438455
}
456+
457+
if err.Error() != "" {
458+
resp.Response.(*extProcPb.ProcessingResponse_ImmediateResponse).ImmediateResponse.Body = []byte(err.Error())
459+
}
460+
439461
return resp, nil
440462
}

pkg/epp/server/runserver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ type ExtProcServerRunner struct {
4747
Datastore datastore.Datastore
4848
SecureServing bool
4949
CertPath string
50-
UseStreaming bool
5150
RefreshPrometheusMetricsInterval time.Duration
5251
Scheduler requestcontrol.Scheduler
5352

test/integration/epp/hermetic_test.go

Lines changed: 171 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,74 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
183183
},
184184
},
185185
},
186+
{
187+
name: "invalid json; return body",
188+
requests: []*extProcPb.ProcessingRequest{
189+
{
190+
Request: &extProcPb.ProcessingRequest_RequestHeaders{
191+
RequestHeaders: &extProcPb.HttpHeaders{
192+
Headers: &configPb.HeaderMap{
193+
Headers: []*configPb.HeaderValue{
194+
{
195+
Key: "hi",
196+
Value: "mom",
197+
},
198+
},
199+
},
200+
},
201+
},
202+
},
203+
{
204+
Request: &extProcPb.ProcessingRequest_RequestBody{
205+
RequestBody: &extProcPb.HttpBody{Body: []byte("no healthy upstream"), EndOfStream: true},
206+
},
207+
},
208+
},
209+
// pod-1 will be picked because it has relatively low queue size, with the requested
210+
// model being active, and has low KV cache.
211+
pods: map[*backend.Pod]*backendmetrics.MetricsState{
212+
fakePod(0): {
213+
WaitingQueueSize: 0,
214+
KVCacheUsagePercent: 0.2,
215+
ActiveModels: map[string]int{
216+
"foo": 1,
217+
"bar": 1,
218+
},
219+
WaitingModels: map[string]int{},
220+
},
221+
fakePod(1): {
222+
WaitingQueueSize: 0,
223+
KVCacheUsagePercent: 0.1,
224+
ActiveModels: map[string]int{
225+
"foo": 1,
226+
"sql-lora-1fdg2": 1,
227+
},
228+
WaitingModels: map[string]int{},
229+
},
230+
fakePod(2): {
231+
WaitingQueueSize: 10,
232+
KVCacheUsagePercent: 0.2,
233+
ActiveModels: map[string]int{
234+
"foo": 1,
235+
"bar": 1,
236+
},
237+
WaitingModels: map[string]int{},
238+
},
239+
},
240+
wantErr: false,
241+
wantResponses: []*extProcPb.ProcessingResponse{
242+
{
243+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
244+
ImmediateResponse: &extProcPb.ImmediateResponse{
245+
Status: &envoyTypePb.HttpStatus{
246+
Code: envoyTypePb.StatusCode_BadRequest,
247+
},
248+
Body: []byte("inference gateway: BadRequest - Error unmarshaling request body: no healthy upstream"),
249+
},
250+
},
251+
},
252+
},
253+
},
186254
{
187255
name: "select active lora, low queue",
188256
requests: integrationutils.GenerateStreamedRequestSet(logger, "test2", "sql-lora"),
@@ -407,6 +475,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
407475
Status: &envoyTypePb.HttpStatus{
408476
Code: envoyTypePb.StatusCode_TooManyRequests,
409477
},
478+
Body: []byte("inference gateway: InferencePoolResourceExhausted - failed to find target pod: inference gateway: Internal - no pods available for the given request"),
410479
},
411480
},
412481
},
@@ -842,6 +911,106 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
842911
},
843912
},
844913
},
914+
{
915+
name: "Response is invalid json; return body",
916+
requests: []*extProcPb.ProcessingRequest{
917+
{
918+
Request: &extProcPb.ProcessingRequest_ResponseHeaders{
919+
ResponseHeaders: &extProcPb.HttpHeaders{
920+
Headers: &configPb.HeaderMap{
921+
Headers: []*configPb.HeaderValue{
922+
{
923+
Key: "content-type",
924+
Value: "application/json",
925+
},
926+
},
927+
},
928+
},
929+
},
930+
},
931+
{
932+
Request: &extProcPb.ProcessingRequest_ResponseBody{
933+
ResponseBody: &extProcPb.HttpBody{Body: []byte("no healthy upstream"), EndOfStream: true},
934+
},
935+
},
936+
},
937+
938+
//
939+
// pod 0 will be picked as all other models are above threshold
940+
pods: map[*backend.Pod]*backendmetrics.MetricsState{
941+
fakePod(0): {
942+
WaitingQueueSize: 4,
943+
KVCacheUsagePercent: 0.2,
944+
ActiveModels: map[string]int{
945+
"foo": 1,
946+
"bar": 1,
947+
"sql-lora-1fdg3": 1,
948+
},
949+
WaitingModels: map[string]int{},
950+
},
951+
fakePod(1): {
952+
WaitingQueueSize: 0,
953+
KVCacheUsagePercent: 0.85,
954+
ActiveModels: map[string]int{
955+
"foo": 1,
956+
"sql-lora-1fdg3": 1,
957+
},
958+
WaitingModels: map[string]int{},
959+
},
960+
fakePod(2): {
961+
WaitingQueueSize: 10,
962+
KVCacheUsagePercent: 0.9,
963+
ActiveModels: map[string]int{
964+
"foo": 1,
965+
"sql-lora-1fdg3": 1,
966+
},
967+
WaitingModels: map[string]int{},
968+
},
969+
},
970+
wantErr: false,
971+
wantResponses: []*extProcPb.ProcessingResponse{
972+
{
973+
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
974+
ResponseHeaders: &extProcPb.HeadersResponse{
975+
Response: &extProcPb.CommonResponse{
976+
HeaderMutation: &extProcPb.HeaderMutation{
977+
SetHeaders: []*configPb.HeaderValueOption{
978+
{
979+
Header: &configPb.HeaderValue{
980+
Key: "x-went-into-resp-headers",
981+
RawValue: []byte("true"),
982+
},
983+
},
984+
{
985+
Header: &configPb.HeaderValue{
986+
Key: "content-type",
987+
RawValue: []uint8("application/json"),
988+
},
989+
},
990+
},
991+
},
992+
},
993+
},
994+
},
995+
},
996+
{
997+
Response: &extProcPb.ProcessingResponse_ResponseBody{
998+
ResponseBody: &extProcPb.BodyResponse{
999+
Response: &extProcPb.CommonResponse{
1000+
BodyMutation: &extProcPb.BodyMutation{
1001+
Mutation: &extProcPb.BodyMutation_StreamedResponse{
1002+
StreamedResponse: &extProcPb.StreamedBodyResponse{
1003+
Body: []byte("no healthy upstream"),
1004+
EndOfStream: true,
1005+
},
1006+
},
1007+
},
1008+
},
1009+
},
1010+
},
1011+
},
1012+
},
1013+
},
8451014
{
8461015
name: "responsebody sent over a single request, but empty body with EndOfStream in the second request(this is how envoy operates); content-type is json, buffer",
8471016
requests: []*extProcPb.ProcessingRequest{
@@ -1261,7 +1430,7 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12611430

12621431
for _, test := range tests {
12631432
t.Run(test.name, func(t *testing.T) {
1264-
client, cleanup := setUpHermeticServer(t, test.pods, true)
1433+
client, cleanup := setUpHermeticServer(t, test.pods)
12651434
t.Cleanup(cleanup)
12661435
responses, err := integrationutils.StreamedRequest(t, client, test.requests, len(test.wantResponses))
12671436

@@ -1290,14 +1459,13 @@ func TestFullDuplexStreamed_KubeInferenceModelRequest(t *testing.T) {
12901459
}
12911460
}
12921461

1293-
func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendmetrics.MetricsState, streamed bool) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
1462+
func setUpHermeticServer(t *testing.T, podAndMetrics map[*backend.Pod]*backendmetrics.MetricsState) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) {
12941463
// Reconfigure the TestPodMetricsClient.
12951464
res := map[types.NamespacedName]*backendmetrics.MetricsState{}
12961465
for pod, metrics := range podAndMetrics {
12971466
res[pod.NamespacedName] = metrics
12981467
}
12991468
serverRunner.TestPodMetricsClient.SetRes(res)
1300-
serverRunner.UseStreaming = streamed
13011469

13021470
serverCtx, stopServer := context.WithCancel(context.Background())
13031471

0 commit comments

Comments
 (0)