Skip to content

Fix streaming prelude serialization #692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-events.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
matrix:
toolchain:
- "1.62.0" # Current MSRV
- "1.64.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
strategy:
matrix:
toolchain:
- "1.62.0" # Current MSRV
- "1.64.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build-runtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
strategy:
matrix:
toolchain:
- "1.62.0" # Current MSRV
- "1.64.0" # Current MSRV
- stable
env:
RUST_BACKTRACE: 1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ This will make your function compile much faster.

## Supported Rust Versions (MSRV)

The AWS Lambda Rust Runtime requires a minimum of Rust 1.62, and is not guaranteed to build on compiler versions earlier than that.
The AWS Lambda Rust Runtime requires a minimum of Rust 1.64, and is not guaranteed to build on compiler versions earlier than that.

## Security

Expand Down
1 change: 1 addition & 0 deletions lambda-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ tower = { version = "0.4", features = ["util"] }
tokio-stream = "0.1.2"
lambda_runtime_api_client = { version = "0.8", path = "../lambda-runtime-api-client" }
serde_path_to_error = "0.1.11"
http-serde = "1.1.3"
68 changes: 35 additions & 33 deletions lambda-runtime/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use crate::{
use bytes::Bytes;
use futures::FutureExt;
use http::header::{CONTENT_TYPE, SET_COOKIE};
use http::{Method, Request, Response, Uri};
use http::{HeaderMap, Method, Request, Response, StatusCode, Uri};
use hyper::body::HttpBody;
use hyper::{client::connect::Connection, Body};
use lambda_runtime_api_client::{build_request, Client};
use serde::Deserialize;
use serde_json::json;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{
env,
Expand Down Expand Up @@ -203,6 +201,16 @@ pub(crate) struct EventCompletionStreamingRequest<'a, B> {
pub(crate) body: Response<B>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct MetadataPrelude {
#[serde(serialize_with = "http_serde::status_code::serialize")]
status_code: StatusCode,
#[serde(serialize_with = "http_serde::header_map::serialize")]
headers: HeaderMap,
cookies: Vec<String>,
}

impl<'a, B> IntoRequest for EventCompletionStreamingRequest<'a, B>
where
B: HttpBody + Unpin + Send + 'static,
Expand All @@ -216,45 +224,39 @@ where
let (parts, mut body) = self.body.into_parts();

let mut builder = build_request().method(Method::POST).uri(uri);
let headers = builder.headers_mut().unwrap();
let req_headers = builder.headers_mut().unwrap();

headers.insert("Transfer-Encoding", "chunked".parse()?);
headers.insert("Lambda-Runtime-Function-Response-Mode", "streaming".parse()?);
headers.insert(
req_headers.insert("Transfer-Encoding", "chunked".parse()?);
req_headers.insert("Lambda-Runtime-Function-Response-Mode", "streaming".parse()?);
req_headers.insert(
"Content-Type",
"application/vnd.awslambda.http-integration-response".parse()?,
);

let (mut tx, rx) = Body::channel();
let mut prelude_headers = parts.headers;
// default Content-Type
prelude_headers
.entry(CONTENT_TYPE)
.or_insert("application/octet-stream".parse()?);

tokio::spawn(async move {
let mut header_map = parts.headers;
// default Content-Type
header_map
.entry(CONTENT_TYPE)
.or_insert("application/octet-stream".parse().unwrap());
let cookies = prelude_headers.get_all(SET_COOKIE);
let cookies = cookies
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<String>>();
prelude_headers.remove(SET_COOKIE);

let cookies = header_map.get_all(SET_COOKIE);
let cookies = cookies
.iter()
.map(|c| String::from_utf8_lossy(c.as_bytes()).to_string())
.collect::<Vec<String>>();
let metadata_prelude = serde_json::to_string(&MetadataPrelude {
status_code: parts.status,
headers: prelude_headers,
cookies,
})?;

let headers = header_map
.iter()
.filter(|(k, _)| *k != SET_COOKIE)
.map(|(k, v)| (k.as_str(), String::from_utf8_lossy(v.as_bytes()).to_string()))
.collect::<HashMap<&str, String>>();
trace!(?metadata_prelude);

let metadata_prelude = json!({
"statusCode": parts.status.as_u16(),
"headers": headers,
"cookies": cookies,
})
.to_string();

trace!("metadata_prelude: {}", metadata_prelude);
let (mut tx, rx) = Body::channel();

tokio::spawn(async move {
tx.send_data(metadata_prelude.into()).await.unwrap();
tx.send_data("\u{0}".repeat(8).into()).await.unwrap();

Expand Down