Skip to content

Commit 96a6abd

Browse files
authored
Propagate Step Function Trace Context through Managed Services (#573)
Allows us to extract Step Function trace context in the following cases 1. SFN -> EventBridge -> Lambda 2. SFN -> EventBridge -> SQS -> Lambda 3. SFN -> SQS -> Lambda 4. SFN -> SNS -> Lambda 5. SFN -> SNS -> SQS -> Lambda
1 parent 58a55bb commit 96a6abd

File tree

5 files changed

+387
-167
lines changed

5 files changed

+387
-167
lines changed

datadog_lambda/tracing.py

+56-25
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
_EventSource,
4040
parse_event_source,
4141
get_first_record,
42+
is_step_function_event,
4243
EventTypes,
4344
EventSubtypes,
4445
)
@@ -271,6 +272,15 @@ def extract_context_from_sqs_or_sns_event_or_context(event, lambda_context):
271272

272273
if dd_json_data:
273274
dd_data = json.loads(dd_json_data)
275+
276+
if is_step_function_event(dd_data):
277+
try:
278+
return extract_context_from_step_functions(dd_data, None)
279+
except Exception:
280+
logger.debug(
281+
"Failed to extract Step Functions context from SQS/SNS event."
282+
)
283+
274284
return propagator.extract(dd_data)
275285
else:
276286
# Handle case where trace context is injected into attributes.AWSTraceHeader
@@ -313,19 +323,39 @@ def _extract_context_from_eventbridge_sqs_event(event):
313323
body = json.loads(body_str)
314324
detail = body.get("detail")
315325
dd_context = detail.get("_datadog")
326+
327+
if is_step_function_event(dd_context):
328+
try:
329+
return extract_context_from_step_functions(dd_context, None)
330+
except Exception:
331+
logger.debug(
332+
"Failed to extract Step Functions context from EventBridge to SQS event."
333+
)
334+
316335
return propagator.extract(dd_context)
317336

318337

319338
def extract_context_from_eventbridge_event(event, lambda_context):
320339
"""
321340
Extract datadog trace context from an EventBridge message's Details.
322341
This is only possible if Details is a JSON string.
342+
343+
If we find a Step Function context, try to extract the trace context from
344+
that header.
323345
"""
324346
try:
325347
detail = event.get("detail")
326348
dd_context = detail.get("_datadog")
327349
if not dd_context:
328350
return extract_context_from_lambda_context(lambda_context)
351+
352+
try:
353+
return extract_context_from_step_functions(dd_context, None)
354+
except Exception:
355+
logger.debug(
356+
"Failed to extract Step Functions context from EventBridge event."
357+
)
358+
329359
return propagator.extract(dd_context)
330360
except Exception as e:
331361
logger.debug("The trace extractor returned with error %s", e)
@@ -424,7 +454,7 @@ def _generate_sfn_trace_id(execution_id: str, part: str):
424454
def extract_context_from_step_functions(event, lambda_context):
425455
"""
426456
Only extract datadog trace context when Step Functions Context Object is injected
427-
into lambda's event dict.
457+
into lambda's event dict. Unwrap "Payload" if it exists to handle Legacy Lambda cases.
428458
429459
If '_datadog' header is present, we have two cases:
430460
1. Root is a Lambda and we use its traceID
@@ -435,25 +465,25 @@ def extract_context_from_step_functions(event, lambda_context):
435465
object.
436466
"""
437467
try:
468+
event = event.get("Payload", event)
469+
event = event.get("_datadog", event)
470+
438471
meta = {}
439-
dd_data = event.get("_datadog")
440472

441-
if dd_data and dd_data.get("serverless-version") == "v1":
442-
if "x-datadog-trace-id" in dd_data: # lambda root
443-
trace_id = int(dd_data.get("x-datadog-trace-id"))
444-
high_64_bit_trace_id = _parse_high_64_bits(
445-
dd_data.get("x-datadog-tags")
446-
)
473+
if event.get("serverless-version") == "v1":
474+
if "x-datadog-trace-id" in event: # lambda root
475+
trace_id = int(event.get("x-datadog-trace-id"))
476+
high_64_bit_trace_id = _parse_high_64_bits(event.get("x-datadog-tags"))
447477
if high_64_bit_trace_id:
448478
meta["_dd.p.tid"] = high_64_bit_trace_id
449479
else: # sfn root
450-
root_execution_id = dd_data.get("RootExecutionId")
480+
root_execution_id = event.get("RootExecutionId")
451481
trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS)
452482
meta["_dd.p.tid"] = _generate_sfn_trace_id(
453483
root_execution_id, HIGHER_64_BITS
454484
)
455485

456-
parent_id = _generate_sfn_parent_id(dd_data)
486+
parent_id = _generate_sfn_parent_id(event)
457487
else:
458488
execution_id = event.get("Execution").get("Id")
459489
trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS)
@@ -472,20 +502,6 @@ def extract_context_from_step_functions(event, lambda_context):
472502
return extract_context_from_lambda_context(lambda_context)
473503

474504

475-
def is_legacy_lambda_step_function(event):
476-
"""
477-
Check if the event is a step function that called a legacy lambda
478-
"""
479-
if not isinstance(event, dict) or "Payload" not in event:
480-
return False
481-
482-
event = event.get("Payload")
483-
return isinstance(event, dict) and (
484-
"_datadog" in event
485-
or ("Execution" in event and "StateMachine" in event and "State" in event)
486-
)
487-
488-
489505
def extract_context_custom_extractor(extractor, event, lambda_context):
490506
"""
491507
Extract Datadog trace context using a custom trace extractor function
@@ -1309,8 +1325,18 @@ def create_inferred_span_from_eventbridge_event(event, context):
13091325
synchronicity="async",
13101326
tag_source="self",
13111327
)
1312-
dt_format = "%Y-%m-%dT%H:%M:%SZ"
1328+
13131329
timestamp = event.get("time")
1330+
dt_format = "%Y-%m-%dT%H:%M:%SZ"
1331+
1332+
# Use more granular timestamp from upstream Step Function if possible
1333+
try:
1334+
if is_step_function_event(event.get("detail")):
1335+
timestamp = event["detail"]["_datadog"]["State"]["EnteredTime"]
1336+
dt_format = "%Y-%m-%dT%H:%M:%S.%fZ"
1337+
except (TypeError, KeyError, AttributeError):
1338+
logger.debug("Error parsing timestamp from Step Functions event")
1339+
13141340
dt = datetime.strptime(timestamp, dt_format)
13151341

13161342
tracer.set_tags(_dd_origin)
@@ -1320,6 +1346,11 @@ def create_inferred_span_from_eventbridge_event(event, context):
13201346
if span:
13211347
span.set_tags(tags)
13221348
span.start = dt.replace(tzinfo=timezone.utc).timestamp()
1349+
1350+
# Since inferred span will later parent Lambda, preserve Lambda's current parent
1351+
if dd_trace_context.span_id:
1352+
span.parent_id = dd_trace_context.span_id
1353+
13231354
return span
13241355

13251356

datadog_lambda/trigger.py

+27-3
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,7 @@ def parse_event_source(event: dict) -> _EventSource:
146146
if event.get("source") == "aws.events" or has_event_categories:
147147
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS)
148148

149-
if (
150-
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v1"
151-
) or ("Execution" in event and "StateMachine" in event and "State" in event):
149+
if is_step_function_event(event):
152150
event_source = _EventSource(EventTypes.STEPFUNCTIONS)
153151

154152
event_record = get_first_record(event)
@@ -369,3 +367,29 @@ def extract_http_status_code_tag(trigger_tags, response):
369367
status_code = response.status_code
370368

371369
return str(status_code)
370+
371+
372+
def is_step_function_event(event):
373+
"""
374+
Check if the event is a step function that invoked the current lambda.
375+
376+
The whole event can be wrapped in "Payload" in Legacy Lambda cases. There may also be a
377+
"_datadog" for JSONata style context propagation.
378+
379+
The actual event must contain "Execution", "StateMachine", and "State" fields.
380+
"""
381+
event = event.get("Payload", event)
382+
383+
# JSONPath style
384+
if "Execution" in event and "StateMachine" in event and "State" in event:
385+
return True
386+
387+
# JSONata style
388+
dd_context = event.get("_datadog")
389+
return (
390+
dd_context
391+
and "Execution" in dd_context
392+
and "StateMachine" in dd_context
393+
and "State" in dd_context
394+
and "serverless-version" in dd_context
395+
)

datadog_lambda/wrapper.py

-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
is_authorizer_response,
4646
tracer,
4747
propagator,
48-
is_legacy_lambda_step_function,
4948
)
5049
from datadog_lambda.trigger import (
5150
extract_trigger_tags,
@@ -286,8 +285,6 @@ def _before(self, event, context):
286285
self.response = None
287286
set_cold_start(init_timestamp_ns)
288287
submit_invocations_metric(context)
289-
if is_legacy_lambda_step_function(event):
290-
event = event["Payload"]
291288
self.trigger_tags = extract_trigger_tags(event, context)
292289
# Extract Datadog trace context and source from incoming requests
293290
dd_context, trace_context_source, event_source = extract_dd_trace_context(

0 commit comments

Comments
 (0)