Skip to content

Commit a38c77b

Browse files
committed
support step function context in eventbridge extraction
1 parent fc0beaa commit a38c77b

File tree

4 files changed

+149
-17
lines changed

4 files changed

+149
-17
lines changed

datadog_lambda/tracing.py

+33-10
Original file line numberDiff line numberDiff line change
@@ -320,12 +320,17 @@ def extract_context_from_eventbridge_event(event, lambda_context):
320320
"""
321321
Extract datadog trace context from an EventBridge message's Details.
322322
This is only possible if Details is a JSON string.
323+
324+
If we find a Step Function context, try to extract the trace context from
325+
that header.
323326
"""
324327
try:
325328
detail = event.get("detail")
326329
dd_context = detail.get("_datadog")
327330
if not dd_context:
328331
return extract_context_from_lambda_context(lambda_context)
332+
if is_step_function_event(dd_context):
333+
return extract_context_from_step_functions(detail, lambda_context)
329334
return propagator.extract(dd_context)
330335
except Exception as e:
331336
logger.debug("The trace extractor returned with error %s", e)
@@ -424,7 +429,7 @@ def _generate_sfn_trace_id(execution_id: str, part: str):
424429
def extract_context_from_step_functions(event, lambda_context):
425430
"""
426431
Only extract datadog trace context when Step Functions Context Object is injected
427-
into lambda's event dict.
432+
into lambda's event dict. Unwrap "Payload" if it exists to handle Legacy Lambda cases.
428433
429434
If '_datadog' header is present, we have two cases:
430435
1. Root is a Lambda and we use its traceID
@@ -435,6 +440,8 @@ def extract_context_from_step_functions(event, lambda_context):
435440
object.
436441
"""
437442
try:
443+
event = event.get("Payload", event)
444+
438445
meta = {}
439446
dd_data = event.get("_datadog")
440447

@@ -472,18 +479,30 @@ def extract_context_from_step_functions(event, lambda_context):
472479
return extract_context_from_lambda_context(lambda_context)
473480

474481

475-
def is_legacy_lambda_step_function(event):
482+
def is_step_function_event(event):
476483
"""
477-
Check if the event is a step function that called a legacy lambda
484+
Check if the event is a step function that invoked the current lambda.
485+
486+
The whole event can be wrapped in "Payload" in Legacy Lambda cases. There may also be a
487+
"_datadog" for JSONata style context propagation.
488+
489+
The actual event must contain "Execution", "StateMachine", and "State" fields.
478490
"""
479-
if not isinstance(event, dict) or "Payload" not in event:
480-
return False
491+
event = event.get("Payload", event)
492+
493+
# JSONPath style
494+
if all(field in event for field in ("Execution", "StateMachine", "State")):
495+
return True
496+
497+
# JSONata style
498+
if "_datadog" in event:
499+
event = event["_datadog"]
500+
return all(
501+
field in event
502+
for field in ("Execution", "StateMachine", "State", "serverless-version")
503+
)
481504

482-
event = event.get("Payload")
483-
return isinstance(event, dict) and (
484-
"_datadog" in event
485-
or ("Execution" in event and "StateMachine" in event and "State" in event)
486-
)
505+
return False
487506

488507

489508
def extract_context_custom_extractor(extractor, event, lambda_context):
@@ -1320,6 +1339,10 @@ def create_inferred_span_from_eventbridge_event(event, context):
13201339
if span:
13211340
span.set_tags(tags)
13221341
span.start = dt.replace(tzinfo=timezone.utc).timestamp()
1342+
1343+
# Since inferred span will later parent Lambda, preserve Lambda's current parent
1344+
span.parent_id = dd_trace_context.span_id
1345+
13231346
return span
13241347

13251348

datadog_lambda/trigger.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
from enum import Enum
1111
from typing import Any
1212

13+
from datadog_lambda.tracing import is_step_function_event
14+
1315

1416
class _stringTypedEnum(Enum):
1517
"""
@@ -146,9 +148,7 @@ def parse_event_source(event: dict) -> _EventSource:
146148
if event.get("source") == "aws.events" or has_event_categories:
147149
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS)
148150

149-
if (
150-
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v1"
151-
) or ("Execution" in event and "StateMachine" in event and "State" in event):
151+
if is_step_function_event(event):
152152
event_source = _EventSource(EventTypes.STEPFUNCTIONS)
153153

154154
event_record = get_first_record(event)

datadog_lambda/wrapper.py

-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
is_authorizer_response,
4646
tracer,
4747
propagator,
48-
is_legacy_lambda_step_function,
4948
)
5049
from datadog_lambda.trigger import (
5150
extract_trigger_tags,
@@ -279,8 +278,6 @@ def _before(self, event, context):
279278
self.response = None
280279
set_cold_start(init_timestamp_ns)
281280
submit_invocations_metric(context)
282-
if is_legacy_lambda_step_function(event):
283-
event = event["Payload"]
284281
self.trigger_tags = extract_trigger_tags(event, context)
285282
# Extract Datadog trace context and source from incoming requests
286283
dd_context, trace_context_source, event_source = extract_dd_trace_context(

tests/test_tracing.py

+113-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
service_mapping as global_service_mapping,
4343
propagator,
4444
emit_telemetry_on_exception_outside_of_handler,
45-
is_legacy_lambda_step_function,
45+
is_step_function_event,
4646
)
4747
from datadog_lambda.trigger import EventTypes
4848

@@ -836,6 +836,55 @@ def test_step_function_trace_data_sfn_root(self):
836836
expected_context,
837837
)
838838

839+
@with_trace_propagation_style("datadog")
840+
def test_step_function_trace_data_event_bridge(self):
841+
lambda_ctx = get_mock_context()
842+
sfn_event = {
843+
"_datadog": {
844+
"Execution": {
845+
"StartTime": "2025-03-11T01:16:31.408Z",
846+
"Id": "arn:aws:states:sa-east-1:425362996713:execution:abhinav-inner-state-machine:eb6298d0-93b5-4fe0-8af9-fefe2933b0ed",
847+
"RedriveCount": 0,
848+
"RoleArn": "arn:aws:iam::425362996713:role/service-role/StepFunctions-abhinav-activity-state-machine-role-22jpbgl6j",
849+
"Name": "eb6298d0-93b5-4fe0-8af9-fefe2933b0ed",
850+
},
851+
"StateMachine": {
852+
"Id": "arn:aws:states:sa-east-1:425362996713:stateMachine:abhinav-inner-state-machine",
853+
"Name": "abhinav-inner-state-machine",
854+
},
855+
"State": {
856+
"EnteredTime": "2025-03-11T01:16:31.448Z",
857+
"RetryCount": 0,
858+
"Name": "EventBridge PutEvents",
859+
},
860+
"serverless-version": "v1",
861+
"RootExecutionId": "arn:aws:states:sa-east-1:425362996713:execution:abhinav-inner-state-machine:eb6298d0-93b5-4fe0-8af9-fefe2933b0ed",
862+
}
863+
}
864+
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
865+
self.assertEqual(source, "event")
866+
expected_context = Context(
867+
trace_id=4521899030418994483,
868+
span_id=6880978411788117524,
869+
sampling_priority=1,
870+
meta={"_dd.p.tid": "12d1270d99cc5e03"},
871+
)
872+
self.assertEqual(ctx, expected_context)
873+
self.assertEqual(
874+
get_dd_trace_context(),
875+
{
876+
TraceHeader.TRACE_ID: "4521899030418994483",
877+
TraceHeader.PARENT_ID: "2685222157636933868",
878+
TraceHeader.SAMPLING_PRIORITY: "1",
879+
TraceHeader.TAGS: "_dd.p.tid=12d1270d99cc5e03",
880+
},
881+
)
882+
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
883+
self.mock_send_segment.assert_called_with(
884+
XraySubsegment.TRACE_KEY,
885+
expected_context,
886+
)
887+
839888

840889
class TestXRayContextConversion(unittest.TestCase):
841890
def test_convert_xray_trace_id(self):
@@ -2282,6 +2331,69 @@ def test_deterministic_m5_hash__always_leading_with_zero(self):
22822331
if len(result_in_binary) == 66: # "0b" + 64 bits.
22832332
self.assertTrue(result_in_binary.startswith("0b0"))
22842333

2334+
def test_is_step_function_event_jsonata(self):
2335+
event = {
2336+
"_datadog": {
2337+
"Execution": {
2338+
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
2339+
"RedriveCount": 0,
2340+
},
2341+
"StateMachine": {},
2342+
"State": {
2343+
"Name": "my-awesome-state",
2344+
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
2345+
"RetryCount": 0,
2346+
},
2347+
"x-datadog-trace-id": "5821803790426892636",
2348+
"x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000",
2349+
"serverless-version": "v1",
2350+
}
2351+
}
2352+
self.assertTrue(is_step_function_event(event))
2353+
2354+
def test_is_step_function_event_jsonpath(self):
2355+
event = {
2356+
"Execution": {
2357+
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
2358+
"RedriveCount": 0,
2359+
},
2360+
"StateMachine": {},
2361+
"State": {
2362+
"Name": "my-awesome-state",
2363+
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
2364+
"RetryCount": 0,
2365+
},
2366+
}
2367+
self.assertTrue(is_step_function_event(event))
2368+
2369+
def test_is_step_function_event_legacy_lambda(self):
2370+
event = {
2371+
"Payload": {
2372+
"Execution": {
2373+
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
2374+
"RedriveCount": 0,
2375+
},
2376+
"StateMachine": {},
2377+
"State": {
2378+
"Name": "my-awesome-state",
2379+
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
2380+
"RetryCount": 0,
2381+
},
2382+
}
2383+
}
2384+
self.assertTrue(is_step_function_event(event))
2385+
2386+
def test_is_step_function_event_dd_header(self):
2387+
event = {
2388+
"_datadog": {
2389+
"x-datadog-trace-id": "5821803790426892636",
2390+
"x-datadog-parent-id": "5821803790426892636",
2391+
"x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000",
2392+
"x-datadog-sampling-priority": "1",
2393+
}
2394+
}
2395+
self.assertFalse(is_step_function_event(event))
2396+
22852397

22862398
class TestExceptionOutsideHandler(unittest.TestCase):
22872399
@patch("datadog_lambda.tracing.dd_tracing_enabled", True)

0 commit comments

Comments
 (0)