@@ -99,6 +99,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
99
99
logger .V (logutil .DEFAULT ).Error (err , "Error populating writer" )
100
100
}
101
101
}()
102
+
103
+ // Message is buffered, we can read and decode.
102
104
if v .RequestBody .EndOfStream {
103
105
loggerVerbose .Info ("decoding" )
104
106
err = decoder .Decode (& requestBody )
@@ -109,8 +111,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
109
111
reader .Close ()
110
112
reader , writer = io .Pipe ()
111
113
decoder = json .NewDecoder (reader )
112
- }
113
- if requestBody != nil {
114
+
114
115
reqCtx , err = s .HandleRequestBody (ctx , reqCtx , req , requestBody )
115
116
if err != nil {
116
117
logger .V (logutil .DEFAULT ).Error (err , "Error handling body" )
@@ -125,11 +126,9 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
125
126
case * extProcPb.ProcessingRequest_ResponseHeaders :
126
127
loggerVerbose .Info ("got response headers" , "headers" , v .ResponseHeaders .Headers .GetHeaders ())
127
128
for _ , header := range v .ResponseHeaders .Headers .GetHeaders () {
128
- if header .Key == "status" {
129
- code := header .RawValue [0 ]
130
- if string (code ) != "200" {
131
- reqCtx .ResponseStatusCode = errutil .ModelServerError
132
- }
129
+ code := header .RawValue [0 ]
130
+ if header .Key == "status" && string (code ) != "200" {
131
+ reqCtx .ResponseStatusCode = errutil .ModelServerError
133
132
}
134
133
}
135
134
reqCtx .RequestState = ResponseRecieved
@@ -161,16 +160,15 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
161
160
}
162
161
}()
163
162
163
+ // Message is buffered, we can read and decode.
164
164
if v .ResponseBody .EndOfStream {
165
-
166
165
err = decoder .Decode (& responseBody )
167
166
if err != nil {
168
167
logger .V (logutil .DEFAULT ).Error (err , "Error unmarshaling request body" )
169
168
}
170
169
// Body stream complete. Close the reader pipe.
171
170
reader .Close ()
172
- }
173
- if responseBody != nil {
171
+
174
172
reqCtx , err = s .HandleResponseBody (ctx , reqCtx , responseBody )
175
173
if err == nil && reqCtx .ResponseComplete {
176
174
reqCtx .ResponseCompleteTimestamp = time .Now ()
@@ -190,13 +188,12 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
190
188
resp , err := BuildErrResponse (err )
191
189
if err != nil {
192
190
return err
193
- } else {
194
- if err := srv .Send (resp ); err != nil {
195
- logger .V (logutil .DEFAULT ).Error (err , "Send failed" )
196
- return status .Errorf (codes .Unknown , "failed to send response back to Envoy: %v" , err )
197
- }
198
- return nil
199
191
}
192
+ if err := srv .Send (resp ); err != nil {
193
+ logger .V (logutil .DEFAULT ).Error (err , "Send failed" )
194
+ return status .Errorf (codes .Unknown , "failed to send response back to Envoy: %v" , err )
195
+ }
196
+ return nil
200
197
}
201
198
loggerVerbose .Info ("checking" , "request state" , reqCtx .RequestState )
202
199
if err := reqCtx .updateStateAndSendIfNeeded (srv , loggerVerbose ); err != nil {
0 commit comments