forked from awslabs/aws-lambda-rust-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi_response.rs
169 lines (157 loc) · 6.51 KB
/
api_response.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use crate::{
deserializer,
requests::{EventCompletionRequest, IntoRequest},
runtime::LambdaInvocation,
Diagnostic, EventErrorRequest, IntoFunctionResponse, LambdaEvent,
};
use futures::{ready, Stream};
use lambda_runtime_api_client::{body::Body, BoxError};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, future::Future, marker::PhantomData, pin::Pin, task};
use tower::Service;
use tracing::{error, trace};
/// Tower service that turns the result or an error of a handler function into a Lambda Runtime API
/// response.
///
/// This type is only meant for internal use in the Lambda runtime crate. The service augments both
/// inputs and outputs: the input is converted from a [LambdaInvocation] into a [LambdaEvent]
/// while any errors encountered during the conversion are turned into error responses. The service
/// outputs either a HTTP request to send to the Lambda Runtime API or a boxed error which ought to
/// be propagated to the caller to terminate the runtime.
pub struct RuntimeApiResponseService<
S,
EventPayload,
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
> {
inner: S,
_phantom: PhantomData<(
EventPayload,
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
)>,
}
impl<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>
RuntimeApiResponseService<S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>
{
pub fn new(inner: S) -> Self {
Self {
inner,
_phantom: PhantomData,
}
}
}
impl<'a, S, EventPayload, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>
Service<LambdaInvocation>
for RuntimeApiResponseService<
S,
EventPayload,
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
>
where
S: Service<LambdaEvent<EventPayload>, Response = Response, Error = Diagnostic<'a>>,
EventPayload: for<'de> Deserialize<'de>,
Response: IntoFunctionResponse<BufferedResponse, StreamingResponse>,
BufferedResponse: Serialize,
StreamingResponse: Stream<Item = Result<StreamItem, StreamError>> + Unpin + Send + 'static,
StreamItem: Into<bytes::Bytes> + Send,
StreamError: Into<BoxError> + Send + Debug,
{
type Response = http::Request<Body>;
type Error = BoxError;
type Future =
RuntimeApiResponseFuture<'a, S::Future, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner
.poll_ready(cx)
.map_err(|err| BoxError::from(format!("{}: {}", err.error_type, err.error_message)))
}
fn call(&mut self, req: LambdaInvocation) -> Self::Future {
#[cfg(debug_assertions)]
if req.parts.status.is_server_error() {
error!("Lambda Runtime server returned an unexpected error");
return RuntimeApiResponseFuture::Ready(Some(Err(req.parts.status.to_string().into())));
}
// Utility closure to propagate potential error from conditionally executed trace
let trace_fn = || {
trace!(
body = std::str::from_utf8(&req.body)?,
"raw JSON event received from Lambda"
);
Ok(())
};
if let Err(err) = trace_fn() {
error!(error = ?err, "Failed to parse raw JSON event received from Lambda. The handler will not be called. Log at TRACE level to see the payload.");
return RuntimeApiResponseFuture::Ready(Some(Err(err)));
};
let request_id = req.context.request_id.clone();
let lambda_event = match deserializer::deserialize::<EventPayload>(&req.body, req.context) {
Ok(lambda_event) => lambda_event,
Err(err) => match build_event_error_request(&request_id, err) {
Ok(request) => return RuntimeApiResponseFuture::Ready(Some(Ok(request))),
Err(err) => {
error!(error = ?err, "failed to build error response for Lambda Runtime API");
return RuntimeApiResponseFuture::Ready(Some(Err(err)));
}
},
};
// Once the handler input has been generated successfully, the
let fut = self.inner.call(lambda_event);
RuntimeApiResponseFuture::Future(fut, request_id, PhantomData)
}
}
fn build_event_error_request<'a, T>(request_id: &'a str, err: T) -> Result<http::Request<Body>, BoxError>
where
T: Into<Diagnostic<'a>> + Debug,
{
error!(error = ?err, "Request payload deserialization into LambdaEvent<T> failed. The handler will not be called. Log at TRACE level to see the payload.");
EventErrorRequest::new(request_id, err).into_req()
}
#[pin_project(project = RuntimeApiResponseFutureProj)]
pub enum RuntimeApiResponseFuture<'a, F, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> {
Future(
#[pin] F,
String,
PhantomData<(
&'a (),
Response,
BufferedResponse,
StreamingResponse,
StreamItem,
StreamError,
)>,
),
Ready(Option<Result<http::Request<Body>, BoxError>>),
}
impl<'a, F, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError> Future
for RuntimeApiResponseFuture<'a, F, Response, BufferedResponse, StreamingResponse, StreamItem, StreamError>
where
F: Future<Output = Result<Response, Diagnostic<'a>>>,
Response: IntoFunctionResponse<BufferedResponse, StreamingResponse>,
BufferedResponse: Serialize,
StreamingResponse: Stream<Item = Result<StreamItem, StreamError>> + Unpin + Send + 'static,
StreamItem: Into<bytes::Bytes> + Send,
StreamError: Into<BoxError> + Send + Debug,
{
type Output = Result<http::Request<Body>, BoxError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
task::Poll::Ready(match self.as_mut().project() {
RuntimeApiResponseFutureProj::Future(fut, request_id, _) => match ready!(fut.poll(cx)) {
Ok(ok) => EventCompletionRequest::new(request_id, ok).into_req(),
Err(err) => EventErrorRequest::new(request_id, err).into_req(),
},
RuntimeApiResponseFutureProj::Ready(ready) => ready.take().expect("future polled after completion"),
})
}
}