-
Notifications
You must be signed in to change notification settings - Fork 359
/
Copy pathextension-trait.rs
88 lines (76 loc) · 2.9 KB
/
extension-trait.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::{
future::{ready, Future},
pin::Pin,
sync::atomic::{AtomicBool, Ordering},
};
use lambda_extension::{Error, LambdaEvent, NextEvent, Service};
use tracing::info;
struct MyExtension {
invoke_count: usize,
ready: AtomicBool,
}
impl Default for MyExtension {
fn default() -> Self {
Self {
invoke_count: usize::default(),
// New instances are not ready to be called until polled.
ready: false.into(),
}
}
}
impl Clone for MyExtension {
fn clone(&self) -> Self {
Self {
invoke_count: self.invoke_count,
// Cloned instances may not be immediately ready to be called.
// https://docs.rs/tower/0.4.13/tower/trait.Service.html#be-careful-when-cloning-inner-services
ready: false.into(),
}
}
}
impl Service<LambdaEvent> for MyExtension {
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
type Response = ();
fn poll_ready(&mut self, _cx: &mut core::task::Context<'_>) -> core::task::Poll<Result<(), Self::Error>> {
if self.ready.swap(true, Ordering::SeqCst) {
info!("[extension] Service was already ready");
} else {
info!("[extension] Service is now ready");
};
core::task::Poll::Ready(Ok(()))
}
fn call(&mut self, event: LambdaEvent) -> Self::Future {
match event.next {
NextEvent::Shutdown(e) => {
info!("[extension] Shutdown event received: {:?}", e);
}
NextEvent::Invoke(e) => {
self.invoke_count += 1;
info!("[extension] Request event {} received: {:?}", self.invoke_count, e);
}
}
// After being called once, the service is no longer ready until polled again.
if self.ready.swap(false, Ordering::SeqCst) {
info!("[extension] The service is ready");
} else {
// https://docs.rs/tower/latest/tower/trait.Service.html#backpressure
// https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
// > Services are permitted to panic if `call` is invoked without obtaining
// > `Poll::Ready(Ok(()))` from `poll_ready`.
panic!("[extension] The service is not ready; `.poll_ready()` must be called first");
}
Box::pin(ready(Ok(())))
}
}
#[tokio::main]
async fn main() -> Result<(), Error> {
// The runtime logging can be enabled here by initializing `tracing` with `tracing-subscriber`
// While `tracing` is used internally, `log` can be used as well if preferred.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
lambda_extension::run(MyExtension::default()).await
}