@@ -5,13 +5,11 @@ use crate::{
5
5
use bytes:: Bytes ;
6
6
use futures:: FutureExt ;
7
7
use http:: header:: { CONTENT_TYPE , SET_COOKIE } ;
8
- use http:: { Method , Request , Response , Uri } ;
8
+ use http:: { HeaderMap , Method , Request , Response , StatusCode , Uri } ;
9
9
use hyper:: body:: HttpBody ;
10
10
use hyper:: { client:: connect:: Connection , Body } ;
11
11
use lambda_runtime_api_client:: { build_request, Client } ;
12
- use serde:: Deserialize ;
13
- use serde_json:: json;
14
- use std:: collections:: HashMap ;
12
+ use serde:: { Deserialize , Serialize } ;
15
13
use std:: str:: FromStr ;
16
14
use std:: {
17
15
env,
@@ -203,6 +201,16 @@ pub(crate) struct EventCompletionStreamingRequest<'a, B> {
203
201
pub ( crate ) body : Response < B > ,
204
202
}
205
203
204
+ #[ derive( Debug , Serialize ) ]
205
+ #[ serde( rename_all = "camelCase" ) ]
206
+ struct MetadataPrelude {
207
+ #[ serde( serialize_with = "http_serde::status_code::serialize" ) ]
208
+ status_code : StatusCode ,
209
+ #[ serde( serialize_with = "http_serde::header_map::serialize" ) ]
210
+ headers : HeaderMap ,
211
+ cookies : Vec < String > ,
212
+ }
213
+
206
214
impl < ' a , B > IntoRequest for EventCompletionStreamingRequest < ' a , B >
207
215
where
208
216
B : HttpBody + Unpin + Send + ' static ,
@@ -216,45 +224,39 @@ where
216
224
let ( parts, mut body) = self . body . into_parts ( ) ;
217
225
218
226
let mut builder = build_request ( ) . method ( Method :: POST ) . uri ( uri) ;
219
- let headers = builder. headers_mut ( ) . unwrap ( ) ;
227
+ let req_headers = builder. headers_mut ( ) . unwrap ( ) ;
220
228
221
- headers . insert ( "Transfer-Encoding" , "chunked" . parse ( ) ?) ;
222
- headers . insert ( "Lambda-Runtime-Function-Response-Mode" , "streaming" . parse ( ) ?) ;
223
- headers . insert (
229
+ req_headers . insert ( "Transfer-Encoding" , "chunked" . parse ( ) ?) ;
230
+ req_headers . insert ( "Lambda-Runtime-Function-Response-Mode" , "streaming" . parse ( ) ?) ;
231
+ req_headers . insert (
224
232
"Content-Type" ,
225
233
"application/vnd.awslambda.http-integration-response" . parse ( ) ?,
226
234
) ;
227
235
228
- let ( mut tx, rx) = Body :: channel ( ) ;
236
+ let mut prelude_headers = parts. headers ;
237
+ // default Content-Type
238
+ prelude_headers
239
+ . entry ( CONTENT_TYPE )
240
+ . or_insert ( "application/octet-stream" . parse ( ) ?) ;
229
241
230
- tokio :: spawn ( async move {
231
- let mut header_map = parts . headers ;
232
- // default Content-Type
233
- header_map
234
- . entry ( CONTENT_TYPE )
235
- . or_insert ( "application/octet-stream" . parse ( ) . unwrap ( ) ) ;
242
+ let cookies = prelude_headers . get_all ( SET_COOKIE ) ;
243
+ let cookies = cookies
244
+ . iter ( )
245
+ . map ( |c| String :: from_utf8_lossy ( c . as_bytes ( ) ) . to_string ( ) )
246
+ . collect :: < Vec < String > > ( ) ;
247
+ prelude_headers . remove ( SET_COOKIE ) ;
236
248
237
- let cookies = header_map . get_all ( SET_COOKIE ) ;
238
- let cookies = cookies
239
- . iter ( )
240
- . map ( |c| String :: from_utf8_lossy ( c . as_bytes ( ) ) . to_string ( ) )
241
- . collect :: < Vec < String > > ( ) ;
249
+ let metadata_prelude = serde_json :: to_string ( & MetadataPrelude {
250
+ status_code : parts . status ,
251
+ headers : prelude_headers ,
252
+ cookies ,
253
+ } ) ? ;
242
254
243
- let headers = header_map
244
- . iter ( )
245
- . filter ( |( k, _) | * k != SET_COOKIE )
246
- . map ( |( k, v) | ( k. as_str ( ) , String :: from_utf8_lossy ( v. as_bytes ( ) ) . to_string ( ) ) )
247
- . collect :: < HashMap < & str , String > > ( ) ;
255
+ trace ! ( ?metadata_prelude) ;
248
256
249
- let metadata_prelude = json ! ( {
250
- "statusCode" : parts. status. as_u16( ) ,
251
- "headers" : headers,
252
- "cookies" : cookies,
253
- } )
254
- . to_string ( ) ;
255
-
256
- trace ! ( "metadata_prelude: {}" , metadata_prelude) ;
257
+ let ( mut tx, rx) = Body :: channel ( ) ;
257
258
259
+ tokio:: spawn ( async move {
258
260
tx. send_data ( metadata_prelude. into ( ) ) . await . unwrap ( ) ;
259
261
tx. send_data ( "\u{0} " . repeat ( 8 ) . into ( ) ) . await . unwrap ( ) ;
260
262
0 commit comments