forked from awslabs/aws-lambda-rust-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathotel.rs
135 lines (119 loc) · 4.03 KB
/
otel.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
use std::{future::Future, pin::Pin, task};
use crate::LambdaInvocation;
use opentelemetry_semantic_conventions::trace as traceconv;
use pin_project::pin_project;
use tower::{Layer, Service};
use tracing::{instrument::Instrumented, Instrument};
/// Tower layer to add OpenTelemetry tracing to a Lambda function invocation. The layer accepts
/// a function to flush OpenTelemetry after the end of the invocation.
pub struct OpenTelemetryLayer<F> {
flush_fn: F,
otel_attribute_trigger: Option<String>,
}
impl<F> OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
/// Create a new [OpenTelemetryLayer] with the provided flush function.
pub fn new(flush_fn: F) -> Self {
Self {
flush_fn,
otel_attribute_trigger: None,
}
}
/// Configure the `faas.trigger` attribute of the OpenTelemetry span.
/// Defaults to `http` if not set.
/// See https://opentelemetry.io/docs/specs/semconv/attributes-registry/faas/ for the list of possible triggers.
pub fn with_trigger<T: Into<String>>(self, trigger: T) -> Self {
Self {
otel_attribute_trigger: Some(trigger.into()),
..self
}
}
}
impl<S, F> Layer<S> for OpenTelemetryLayer<F>
where
F: Fn() + Clone,
{
type Service = OpenTelemetryService<S, F>;
fn layer(&self, inner: S) -> Self::Service {
OpenTelemetryService {
inner,
flush_fn: self.flush_fn.clone(),
coldstart: true,
otel_attribute_trigger: self
.otel_attribute_trigger
.clone()
.unwrap_or_else(|| "http".to_string()),
}
}
}
/// Tower service created by [OpenTelemetryLayer].
pub struct OpenTelemetryService<S, F> {
inner: S,
flush_fn: F,
coldstart: bool,
otel_attribute_trigger: String,
}
impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
where
S: Service<LambdaInvocation, Response = ()>,
F: Fn() + Clone,
{
type Error = S::Error;
type Response = ();
type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: LambdaInvocation) -> Self::Future {
let span = tracing::info_span!(
"Lambda function invocation",
"otel.name" = req.context.env_config.function_name,
{ traceconv::FAAS_TRIGGER } = &self.otel_attribute_trigger,
{ traceconv::FAAS_INVOCATION_ID } = req.context.request_id,
{ traceconv::FAAS_COLDSTART } = self.coldstart
);
// After the first execution, we can set 'coldstart' to false
self.coldstart = false;
let future = {
// Enter the span before calling the inner service
// to ensure that it's assigned as parent of the inner spans.
let _guard = span.enter();
self.inner.call(req)
};
OpenTelemetryFuture {
future: Some(future.instrument(span)),
flush_fn: self.flush_fn.clone(),
}
}
}
/// Future created by [OpenTelemetryService].
#[pin_project]
pub struct OpenTelemetryFuture<Fut, F> {
#[pin]
future: Option<Fut>,
flush_fn: F,
}
impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
where
Fut: Future,
F: Fn(),
{
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
// First, try to get the ready value of the future
let ready = task::ready!(self
.as_mut()
.project()
.future
.as_pin_mut()
.expect("future polled after completion")
.poll(cx));
// If we got the ready value, we first drop the future: this ensures that the
// OpenTelemetry span attached to it is closed and included in the subsequent flush.
Pin::set(&mut self.as_mut().project().future, None);
(self.project().flush_fn)();
task::Poll::Ready(ready)
}
}