Skip to content

Add events pass-through feature in lambda-http crate #775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lambda-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ apigw_rest = []
apigw_http = []
apigw_websockets = []
alb = []
pass_through = []

[dependencies]
base64 = { workspace = true }
Expand All @@ -37,7 +38,7 @@ mime = "0.3"
percent-encoding = "2.2"
pin-project-lite = { workspace = true }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_json = { version = "1.0", features = ["raw_value"] }
serde_urlencoded = "0.7"
tokio-stream = "0.1.2"
url = "2.2"
Expand Down
48 changes: 23 additions & 25 deletions lambda-http/src/deserializer.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,48 @@
use crate::request::LambdaRequest;
#[cfg(feature = "alb")]
use aws_lambda_events::alb::AlbTargetGroupRequest;
#[cfg(feature = "apigw_rest")]
use aws_lambda_events::apigw::ApiGatewayProxyRequest;
#[cfg(feature = "apigw_http")]
use aws_lambda_events::apigw::ApiGatewayV2httpRequest;
#[cfg(feature = "apigw_websockets")]
use aws_lambda_events::apigw::ApiGatewayWebsocketProxyRequest;
use serde::{de::Error, Deserialize};
use serde_json::value::RawValue;

const ERROR_CONTEXT: &str = "this function expects a JSON payload from Amazon API Gateway, Amazon Elastic Load Balancer, or AWS Lambda Function URLs, but the data doesn't match any of those services' events";

#[cfg(feature = "pass_through")]
const PASS_THROUGH_ENABLED: bool = true;

impl<'de> Deserialize<'de> for LambdaRequest {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let content = match serde::__private::de::Content::deserialize(deserializer) {
Ok(content) => content,
Err(err) => return Err(err),
};
let raw_value: Box<RawValue> = Box::deserialize(deserializer)?;
let data = raw_value.get();

#[cfg(feature = "apigw_rest")]
if let Ok(res) = aws_lambda_events::apigw::ApiGatewayProxyRequest::deserialize(
serde::__private::de::ContentRefDeserializer::<D::Error>::new(&content),
) {
if let Ok(res) = serde_json::from_str::<ApiGatewayProxyRequest>(data) {
return Ok(LambdaRequest::ApiGatewayV1(res));
}
#[cfg(feature = "apigw_http")]
if let Ok(res) = aws_lambda_events::apigw::ApiGatewayV2httpRequest::deserialize(
serde::__private::de::ContentRefDeserializer::<D::Error>::new(&content),
) {
if let Ok(res) = serde_json::from_str::<ApiGatewayV2httpRequest>(data) {
return Ok(LambdaRequest::ApiGatewayV2(res));
}
#[cfg(feature = "alb")]
if let Ok(res) =
aws_lambda_events::alb::AlbTargetGroupRequest::deserialize(serde::__private::de::ContentRefDeserializer::<
D::Error,
>::new(&content))
{
if let Ok(res) = serde_json::from_str::<AlbTargetGroupRequest>(data) {
return Ok(LambdaRequest::Alb(res));
}
#[cfg(feature = "apigw_websockets")]
if let Ok(res) = aws_lambda_events::apigw::ApiGatewayWebsocketProxyRequest::deserialize(
serde::__private::de::ContentRefDeserializer::<D::Error>::new(&content),
) {
if let Ok(res) = serde_json::from_str::<ApiGatewayWebsocketProxyRequest>(data) {
return Ok(LambdaRequest::WebSocket(res));
}
#[cfg(feature = "pass_through")]
if PASS_THROUGH_ENABLED {
return Ok(LambdaRequest::PassThrough(data.to_string()));
}

Err(Error::custom(ERROR_CONTEXT))
}
Expand Down Expand Up @@ -104,11 +109,4 @@ mod tests {
other => panic!("unexpected request variant: {:?}", other),
}
}

#[test]
fn test_deserialize_error() {
let err = serde_json::from_str::<LambdaRequest>("{\"body\": {}}").unwrap_err();

assert_eq!(ERROR_CONTEXT, err.to_string());
}
}
32 changes: 32 additions & 0 deletions lambda-http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub enum LambdaRequest {
Alb(AlbTargetGroupRequest),
#[cfg(feature = "apigw_websockets")]
WebSocket(ApiGatewayWebsocketProxyRequest),
#[cfg(feature = "pass_through")]
PassThrough(String),
}

impl LambdaRequest {
Expand All @@ -67,6 +69,8 @@ impl LambdaRequest {
LambdaRequest::Alb { .. } => RequestOrigin::Alb,
#[cfg(feature = "apigw_websockets")]
LambdaRequest::WebSocket { .. } => RequestOrigin::WebSocket,
#[cfg(feature = "pass_through")]
LambdaRequest::PassThrough { .. } => RequestOrigin::PassThrough,
#[cfg(not(any(
feature = "apigw_rest",
feature = "apigw_http",
Expand Down Expand Up @@ -97,6 +101,9 @@ pub enum RequestOrigin {
/// API Gateway WebSocket
#[cfg(feature = "apigw_websockets")]
WebSocket,
/// PassThrough request origin
#[cfg(feature = "pass_through")]
PassThrough,
}

#[cfg(feature = "apigw_http")]
Expand Down Expand Up @@ -338,6 +345,26 @@ fn into_websocket_request(ag: ApiGatewayWebsocketProxyRequest) -> http::Request<
req
}

#[cfg(feature = "pass_through")]
fn into_pass_through_request(data: String) -> http::Request<Body> {
let mut builder = http::Request::builder();

let headers = builder.headers_mut().unwrap();
headers.insert("Content-Type", "application/json".parse().unwrap());

update_xray_trace_id_header(headers);

let raw_path = "/events";

builder
.method(http::Method::POST)
.uri(raw_path)
.extension(RawHttpPath(raw_path.to_string()))
.extension(RequestContext::PassThrough)
.body(Body::from(data))
.expect("failed to build request")
}

#[cfg(any(feature = "apigw_rest", feature = "apigw_http", feature = "apigw_websockets"))]
fn apigw_path_with_stage(stage: &Option<String>, path: &str) -> String {
if env::var("AWS_LAMBDA_HTTP_IGNORE_STAGE_IN_PATH").is_ok() {
Expand Down Expand Up @@ -375,6 +402,9 @@ pub enum RequestContext {
/// WebSocket request context
#[cfg(feature = "apigw_websockets")]
WebSocket(ApiGatewayWebsocketProxyRequestContext),
/// Custom request context
#[cfg(feature = "pass_through")]
PassThrough,
}

/// Converts LambdaRequest types into `http::Request<Body>` types
Expand All @@ -389,6 +419,8 @@ impl From<LambdaRequest> for http::Request<Body> {
LambdaRequest::Alb(alb) => into_alb_request(alb),
#[cfg(feature = "apigw_websockets")]
LambdaRequest::WebSocket(ag) => into_websocket_request(ag),
#[cfg(feature = "pass_through")]
LambdaRequest::PassThrough(data) => into_pass_through_request(data),
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions lambda-http/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub enum LambdaResponse {
ApiGatewayV2(ApiGatewayV2httpResponse),
#[cfg(feature = "alb")]
Alb(AlbTargetGroupResponse),
#[cfg(feature = "pass_through")]
PassThrough(serde_json::Value),
}

/// Transformation from http type to internal type
Expand Down Expand Up @@ -114,6 +116,15 @@ impl LambdaResponse {
headers: headers.clone(),
multi_value_headers: headers,
}),
#[cfg(feature = "pass_through")]
RequestOrigin::PassThrough => {
match body {
// text body must be a valid json string
Some(Body::Text(body)) => {LambdaResponse::PassThrough(serde_json::from_str(&body).unwrap_or_default())},
// binary body and other cases return Value::Null
_ => LambdaResponse::PassThrough(serde_json::Value::Null),
}
}
#[cfg(not(any(
feature = "apigw_rest",
feature = "apigw_http",
Expand Down