-
Notifications
You must be signed in to change notification settings - Fork 45
Propagate Step Function Trace Context through Managed Services #573
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
a38c77b
0a51475
5700d22
f524509
55ff075
d7daf0d
e490610
e31c8f8
dc4a283
f59de19
6222016
45ba945
dc43c9e
eec4743
1b350b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ | |
_EventSource, | ||
parse_event_source, | ||
get_first_record, | ||
is_step_function_event, | ||
EventTypes, | ||
EventSubtypes, | ||
) | ||
|
@@ -271,6 +272,15 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context): | |
|
||
if dd_json_data: | ||
dd_data = json.loads(dd_json_data) | ||
|
||
if is_step_function_event(dd_data): | ||
try: | ||
return extract_context_from_step_functions(dd_data, None) | ||
except Exception: | ||
logger.debug( | ||
"Failed to extract Step Functions context from SQS/SNS event." | ||
) | ||
|
||
return propagator.extract(dd_data) | ||
else: | ||
# Handle case where trace context is injected into attributes.AWSTraceHeader | ||
|
@@ -313,19 +323,35 @@ def _extract_context_from_eventbridge_sqs_event(event): | |
body = json.loads(body_str) | ||
detail = body.get("detail") | ||
dd_context = detail.get("_datadog") | ||
|
||
if is_step_function_event(dd_context): | ||
try: | ||
return extract_context_from_step_functions(dd_context, None) | ||
except Exception: | ||
logger.debug( | ||
"Failed to extract Step Functions context from EventBridge to SQS event." | ||
) | ||
|
||
return propagator.extract(dd_context) | ||
|
||
|
||
def extract_context_from_eventbridge_event(event, lambda_context): | ||
""" | ||
Extract datadog trace context from an EventBridge message's Details. | ||
This is only possible if Details is a JSON string. | ||
|
||
If we find a Step Function context, try to extract the trace context from | ||
that header. | ||
""" | ||
try: | ||
detail = event.get("detail") | ||
dd_context = detail.get("_datadog") | ||
if not dd_context: | ||
return extract_context_from_lambda_context(lambda_context) | ||
|
||
if is_step_function_event(dd_context): | ||
return extract_context_from_step_functions(dd_context, lambda_context) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one isn't wrapped in a try/except, but the two above are, why is that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I also meant to wrap this in try/except but lemme explain why I'm doing it I wanted to pass in
|
||
|
||
return propagator.extract(dd_context) | ||
except Exception as e: | ||
logger.debug("The trace extractor returned with error %s", e) | ||
|
@@ -424,7 +450,7 @@ def _generate_sfn_trace_id(execution_id: str, part: str): | |
def extract_context_from_step_functions(event, lambda_context): | ||
""" | ||
Only extract datadog trace context when Step Functions Context Object is injected | ||
into lambda's event dict. | ||
into lambda's event dict. Unwrap "Payload" if it exists to handle Legacy Lambda cases. | ||
|
||
If '_datadog' header is present, we have two cases: | ||
1. Root is a Lambda and we use its traceID | ||
|
@@ -435,25 +461,25 @@ def extract_context_from_step_functions(event, lambda_context): | |
object. | ||
""" | ||
try: | ||
event = event.get("Payload", event) | ||
event = event.get("_datadog", event) | ||
|
||
meta = {} | ||
dd_data = event.get("_datadog") | ||
|
||
if dd_data and dd_data.get("serverless-version") == "v1": | ||
if "x-datadog-trace-id" in dd_data: # lambda root | ||
trace_id = int(dd_data.get("x-datadog-trace-id")) | ||
high_64_bit_trace_id = _parse_high_64_bits( | ||
dd_data.get("x-datadog-tags") | ||
) | ||
if event.get("serverless-version") == "v1": | ||
if "x-datadog-trace-id" in event: # lambda root | ||
trace_id = int(event.get("x-datadog-trace-id")) | ||
high_64_bit_trace_id = _parse_high_64_bits(event.get("x-datadog-tags")) | ||
if high_64_bit_trace_id: | ||
meta["_dd.p.tid"] = high_64_bit_trace_id | ||
else: # sfn root | ||
root_execution_id = dd_data.get("RootExecutionId") | ||
root_execution_id = event.get("RootExecutionId") | ||
trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS) | ||
meta["_dd.p.tid"] = _generate_sfn_trace_id( | ||
root_execution_id, HIGHER_64_BITS | ||
) | ||
|
||
parent_id = _generate_sfn_parent_id(dd_data) | ||
parent_id = _generate_sfn_parent_id(event) | ||
else: | ||
execution_id = event.get("Execution").get("Id") | ||
trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS) | ||
|
@@ -472,20 +498,6 @@ def extract_context_from_step_functions(event, lambda_context): | |
return extract_context_from_lambda_context(lambda_context) | ||
|
||
|
||
def is_legacy_lambda_step_function(event): | ||
""" | ||
Check if the event is a step function that called a legacy lambda | ||
""" | ||
if not isinstance(event, dict) or "Payload" not in event: | ||
return False | ||
|
||
event = event.get("Payload") | ||
return isinstance(event, dict) and ( | ||
"_datadog" in event | ||
or ("Execution" in event and "StateMachine" in event and "State" in event) | ||
) | ||
|
||
|
||
def extract_context_custom_extractor(extractor, event, lambda_context): | ||
""" | ||
Extract Datadog trace context using a custom trace extractor function | ||
|
@@ -1309,8 +1321,15 @@ def create_inferred_span_from_eventbridge_event(event, context): | |
synchronicity="async", | ||
tag_source="self", | ||
) | ||
dt_format = "%Y-%m-%dT%H:%M:%SZ" | ||
timestamp = event.get("time") | ||
|
||
# Use more granular timestamp from upstream Step Function if possible | ||
if is_step_function_event(event.get("detail")): | ||
timestamp = event.get("detail").get("_datadog").get("State").get("EnteredTime") | ||
avedmala marked this conversation as resolved.
Show resolved
Hide resolved
|
||
dt_format = "%Y-%m-%dT%H:%M:%S.%fZ" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great job! ps. I was confused about what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added numbers next to the pictures to make it clearer for other reviewers |
||
else: | ||
timestamp = event.get("time") | ||
dt_format = "%Y-%m-%dT%H:%M:%SZ" | ||
|
||
dt = datetime.strptime(timestamp, dt_format) | ||
|
||
tracer.set_tags(_dd_origin) | ||
|
@@ -1320,6 +1339,11 @@ def create_inferred_span_from_eventbridge_event(event, context): | |
if span: | ||
span.set_tags(tags) | ||
span.start = dt.replace(tzinfo=timezone.utc).timestamp() | ||
|
||
# Since inferred span will later parent Lambda, preserve Lambda's current parent | ||
if dd_trace_context.span_id: | ||
span.parent_id = dd_trace_context.span_id | ||
|
||
return span | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,9 +146,7 @@ def parse_event_source(event: dict) -> _EventSource: | |
if event.get("source") == "aws.events" or has_event_categories: | ||
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS) | ||
|
||
if ( | ||
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v1" | ||
) or ("Execution" in event and "StateMachine" in event and "State" in event): | ||
if is_step_function_event(event): | ||
event_source = _EventSource(EventTypes.STEPFUNCTIONS) | ||
|
||
event_record = get_first_record(event) | ||
|
@@ -369,3 +367,28 @@ def extract_http_status_code_tag(trigger_tags, response): | |
status_code = response.status_code | ||
|
||
return str(status_code) | ||
|
||
|
||
def is_step_function_event(event): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way we can memoize this function? It looks like it can potentially be called several times in the course of a single invocation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, or it looks like the function can be called multiple times per invocation, but with different "events" each time? If that's true, then we can probably leave it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a great idea! Correct me if I'm wrong but does the layer only handle one Just wondering to get an idea of how large to make the cache. I guess it can be pretty small anyway since each event is new and we don't repeat There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each runtime instance will only ever handle one event at a time. It never handles two events concurrently. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah just realized we can't memoize it because We could serialize the dict and use that but I'm thinking that'd be much slower |
||
""" | ||
Check if the event is a step function that invoked the current lambda. | ||
|
||
The whole event can be wrapped in "Payload" in Legacy Lambda cases. There may also be a | ||
"_datadog" for JSONata style context propagation. | ||
|
||
The actual event must contain "Execution", "StateMachine", and "State" fields. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really like these comments. For someone who hasn't work on step functions for a while, these comments help me recollect these historical context. It'll help future maintenance of the code as well. |
||
""" | ||
event = event.get("Payload", event) | ||
avedmala marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# JSONPath style | ||
if all(field in event for field in ("Execution", "StateMachine", "State")): | ||
avedmala marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return True | ||
|
||
# JSONata style | ||
if "_datadog" in event: | ||
return all( | ||
field in event["_datadog"] | ||
avedmala marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for field in ("Execution", "StateMachine", "State", "serverless-version") | ||
) | ||
|
||
return False |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,7 +45,6 @@ | |
is_authorizer_response, | ||
tracer, | ||
propagator, | ||
is_legacy_lambda_step_function, | ||
) | ||
from datadog_lambda.trigger import ( | ||
extract_trigger_tags, | ||
|
@@ -279,8 +278,6 @@ def _before(self, event, context): | |
self.response = None | ||
set_cold_start(init_timestamp_ns) | ||
submit_invocations_metric(context) | ||
if is_legacy_lambda_step_function(event): | ||
event = event["Payload"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this unwrapping to happen inside of |
||
self.trigger_tags = extract_trigger_tags(event, context) | ||
# Extract Datadog trace context and source from incoming requests | ||
dd_context, trace_context_source, event_source = extract_dd_trace_context( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious about how the concatenation of two queues (e.g., SFN → EventBridge → SQS → Lambda) is handled. Is it achieved by extracting two different contexts in the Python tracer? Does this mean that it also supports SFN → EventBridge → SQS → SNS → Lambda?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SFN → EventBridge → SQS → Lambda is handled the following way
SFN → SNS → SQS → Lambda is handled very similarly with another explicitly check in the SQS extractor looking for SNS events nested
We don't handle SFN → SQS → SNS → Lambda AFAIK but we wouldn't be able to handle SFN → EventBridge → SQS → SNS → Lambda out of the box either
But this is only because it's not explicitly handled. The current python layer implementation is messy because it relies on explicit handling. I think a perfect solution would be one where it's all handled recursively and customers can nest an arbitrary number of supported services without explicit handling
I think AWS team would like to do something like this in bottlecap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@avedmala Thanks for the explanation. Very informative. I am guessing that a recursive solution should not be that complicated? @purple4reina @joeyzhao2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the "recursive solution", is it written down in any RFC? it sounds interesting and might be able to solve some other problems.