Skip to content

Commit c4da90e

Browse files
avedmalanhulstonapiarian-datadogkimi-p
authored
Explicit trace ID propagation for SFN w/o Hashing (#537)
* add logic to extract traceID from _datadog header * rename test * fix type * added root arn case * trigger ci * Add `http.route` tags for API Gateway (#524) Add route tags * feat: [SVLS-5677] DynamoDB Stream event span pointers (#522) * trigger ci * use default propagator.extract * lint * lint * updated to use trace/parent hash from _datadog * lint * skip is context complete check * remove unused import * fix legacy lambda parsing with new header * using context object instead of pre-hashed values * fixed trigger tags and tests * pull sfn trace id generation out into a helper * added unit tests * update test data * rename stepfunctions to states * update current serverless version to v1 * Update trigger comment Co-authored-by: kimi <[email protected]> --------- Co-authored-by: Nicholas Hulston <[email protected]> Co-authored-by: Aleksandr Pasechnik <[email protected]> Co-authored-by: kimi <[email protected]>
1 parent 49df8ee commit c4da90e

File tree

6 files changed

+211
-44
lines changed

6 files changed

+211
-44
lines changed

datadog_lambda/constants.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,8 @@
33
# This product includes software developed at Datadog (https://www.datadoghq.com/).
44
# Copyright 2019 Datadog, Inc.
55

6-
# Datadog trace sampling priority
7-
86

7+
# Datadog trace sampling priority
98
class SamplingPriority(object):
109
USER_REJECT = -1
1110
AUTO_REJECT = 0
@@ -18,6 +17,7 @@ class TraceHeader(object):
1817
TRACE_ID = "x-datadog-trace-id"
1918
PARENT_ID = "x-datadog-parent-id"
2019
SAMPLING_PRIORITY = "x-datadog-sampling-priority"
20+
TAGS = "x-datadog-tags"
2121

2222

2323
# X-Ray subsegment to save Datadog trace metadata

datadog_lambda/tracing.py

+75-21
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,8 @@ def extract_context_from_kinesis_event(event, lambda_context):
356356
return extract_context_from_lambda_context(lambda_context)
357357

358358

359-
def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
359+
def _deterministic_sha256_hash(s: str, part: str) -> int:
360360
sha256_hash = hashlib.sha256(s.encode()).hexdigest()
361-
362361
# First two chars is '0b'. zfill to ensure 256 bits, but we only care about the first 128 bits
363362
binary_hash = bin(int(sha256_hash, 16))[2:].zfill(256)
364363
if part == HIGHER_64_BITS:
@@ -371,36 +370,88 @@ def _deterministic_sha256_hash(s: str, part: str) -> (int, int):
371370
return result
372371

373372

373+
def _parse_high_64_bits(trace_tags: str) -> str:
374+
"""
375+
Parse a list of trace tags such as [_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0] and return the
376+
value of the _dd.p.tid tag or an empty string if not found.
377+
"""
378+
if trace_tags:
379+
for tag in trace_tags.split(","):
380+
if "_dd.p.tid=" in tag:
381+
return tag.split("=")[1]
382+
383+
return ""
384+
385+
386+
def _generate_sfn_parent_id(context: dict) -> int:
387+
execution_id = context.get("Execution").get("Id")
388+
state_name = context.get("State").get("Name")
389+
state_entered_time = context.get("State").get("EnteredTime")
390+
391+
return _deterministic_sha256_hash(
392+
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
393+
)
394+
395+
396+
def _generate_sfn_trace_id(execution_id: str, part: str):
397+
"""
398+
Take the SHA-256 hash of the execution_id to calculate the trace ID. If the high 64 bits are
399+
specified, we take those bits and use hex to encode it. We also remove the first two characters
400+
as they will be '0x in the hex string.
401+
402+
We care about full 128 bits because they will break up into traditional traceID and
403+
_dd.p.tid tag.
404+
"""
405+
if part == HIGHER_64_BITS:
406+
return hex(_deterministic_sha256_hash(execution_id, part))[2:]
407+
return _deterministic_sha256_hash(execution_id, part)
408+
409+
374410
def extract_context_from_step_functions(event, lambda_context):
375411
"""
376412
Only extract datadog trace context when Step Functions Context Object is injected
377413
into lambda's event dict.
414+
415+
If '_datadog' header is present, we have two cases:
416+
1. Root is a Lambda and we use its traceID
417+
2. Root is a SFN, and we use its executionARN to calculate the traceID
418+
We calculate the parentID the same in both cases by using the parent SFN's context object.
419+
420+
Otherwise, we're dealing with the legacy case where we only have the parent SFN's context
421+
object.
378422
"""
379423
try:
380-
execution_id = event.get("Execution").get("Id")
381-
state_name = event.get("State").get("Name")
382-
state_entered_time = event.get("State").get("EnteredTime")
383-
# returning 128 bits since 128bit traceId will be break up into
384-
# traditional traceId and _dd.p.tid tag
385-
# https://github.com/DataDog/dd-trace-py/blob/3e34d21cb9b5e1916e549047158cb119317b96ab/ddtrace/propagation/http.py#L232-L240
386-
trace_id = _deterministic_sha256_hash(execution_id, LOWER_64_BITS)
387-
388-
parent_id = _deterministic_sha256_hash(
389-
f"{execution_id}#{state_name}#{state_entered_time}", HIGHER_64_BITS
390-
)
424+
meta = {}
425+
dd_data = event.get("_datadog")
426+
427+
if dd_data and dd_data.get("serverless-version") == "v1":
428+
if "x-datadog-trace-id" in dd_data: # lambda root
429+
trace_id = int(dd_data.get("x-datadog-trace-id"))
430+
high_64_bit_trace_id = _parse_high_64_bits(
431+
dd_data.get("x-datadog-tags")
432+
)
433+
if high_64_bit_trace_id:
434+
meta["_dd.p.tid"] = high_64_bit_trace_id
435+
else: # sfn root
436+
root_execution_id = dd_data.get("RootExecutionId")
437+
trace_id = _generate_sfn_trace_id(root_execution_id, LOWER_64_BITS)
438+
meta["_dd.p.tid"] = _generate_sfn_trace_id(
439+
root_execution_id, HIGHER_64_BITS
440+
)
441+
442+
parent_id = _generate_sfn_parent_id(dd_data)
443+
else:
444+
execution_id = event.get("Execution").get("Id")
445+
trace_id = _generate_sfn_trace_id(execution_id, LOWER_64_BITS)
446+
meta["_dd.p.tid"] = _generate_sfn_trace_id(execution_id, HIGHER_64_BITS)
447+
parent_id = _generate_sfn_parent_id(event)
391448

392449
sampling_priority = SamplingPriority.AUTO_KEEP
393450
return Context(
394451
trace_id=trace_id,
395452
span_id=parent_id,
396453
sampling_priority=sampling_priority,
397-
# take the higher 64 bits as _dd.p.tid tag and use hex to encode
398-
# [2:] to remove '0x' in the hex str
399-
meta={
400-
"_dd.p.tid": hex(
401-
_deterministic_sha256_hash(execution_id, HIGHER_64_BITS)
402-
)[2:]
403-
},
454+
meta=meta,
404455
)
405456
except Exception as e:
406457
logger.debug("The Step Functions trace extractor returned with error %s", e)
@@ -415,7 +466,10 @@ def is_legacy_lambda_step_function(event):
415466
return False
416467

417468
event = event.get("Payload")
418-
return "Execution" in event and "StateMachine" in event and "State" in event
469+
return isinstance(event, dict) and (
470+
"_datadog" in event
471+
or ("Execution" in event and "StateMachine" in event and "State" in event)
472+
)
419473

420474

421475
def extract_context_custom_extractor(extractor, event, lambda_context):

datadog_lambda/trigger.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ def parse_event_source(event: dict) -> _EventSource:
146146
if event.get("source") == "aws.events" or has_event_categories:
147147
event_source = _EventSource(EventTypes.CLOUDWATCH_EVENTS)
148148

149-
if "Execution" in event and "StateMachine" in event and "State" in event:
149+
if (
150+
"_datadog" in event and event.get("_datadog").get("serverless-version") == "v1"
151+
) or ("Execution" in event and "StateMachine" in event and "State" in event):
150152
event_source = _EventSource(EventTypes.STEPFUNCTIONS)
151153

152154
event_record = get_first_record(event)
@@ -254,6 +256,13 @@ def parse_event_source_arn(source: _EventSource, event: dict, context: Any) -> s
254256
if source.event_type == EventTypes.CLOUDWATCH_EVENTS and event.get("resources"):
255257
return event.get("resources")[0]
256258

259+
# Returning state machine arn as event source arn.
260+
if source.event_type == EventTypes.STEPFUNCTIONS:
261+
context = event
262+
if "_datadog" in event:
263+
context = event.get("_datadog")
264+
return context.get("StateMachine").get("Id")
265+
257266

258267
def get_event_source_arn(source: _EventSource, event: dict, context: Any) -> str:
259268
event_source_arn = event.get("eventSourceARN") or event.get("eventSourceArn")

tests/event_samples/states.json

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"_datadog": {
3+
"Execution": {
4+
"Id": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
5+
"StartTime": "2024-11-13T16:46:47.715Z",
6+
"Name": "1356a963-42a5-48b0-ba3f-73bde559a50c",
7+
"RoleArn": "arn:aws:iam::425362996713:role/service-role/StepFunctions-MyStateMachine-wsx8chv4d-role-1su0fkfd3",
8+
"RedriveCount": 0
9+
},
10+
"StateMachine": {
11+
"Id": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
12+
"Name": "MyStateMachine-wsx8chv4d"
13+
},
14+
"State": {
15+
"Name": "Lambda Invoke",
16+
"EnteredTime": "2024-11-13T16:46:47.740Z",
17+
"RetryCount": 0
18+
},
19+
"RootExecutionId": "arn:aws:states:ca-central-1:425362996713:execution:MyStateMachine-wsx8chv4d:1356a963-42a5-48b0-ba3f-73bde559a50c",
20+
"serverless-version": "v1"
21+
}
22+
}

tests/test_tracing.py

+73-20
Original file line numberDiff line numberDiff line change
@@ -617,7 +617,7 @@ def test_with_complete_datadog_trace_headers_with_trigger_tags(self):
617617
@with_trace_propagation_style("datadog")
618618
def test_step_function_trace_data(self):
619619
lambda_ctx = get_mock_context()
620-
sqs_event = {
620+
sfn_event = {
621621
"Execution": {
622622
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
623623
},
@@ -627,7 +627,7 @@ def test_step_function_trace_data(self):
627627
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
628628
},
629629
}
630-
ctx, source, event_source = extract_dd_trace_context(sqs_event, lambda_ctx)
630+
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
631631
self.assertEqual(source, "event")
632632
expected_context = Context(
633633
trace_id=3675572987363469717,
@@ -642,7 +642,7 @@ def test_step_function_trace_data(self):
642642
TraceHeader.TRACE_ID: "3675572987363469717",
643643
TraceHeader.PARENT_ID: "10713633173203262661",
644644
TraceHeader.SAMPLING_PRIORITY: "1",
645-
"x-datadog-tags": "_dd.p.tid=e987c84b36b11ab",
645+
TraceHeader.TAGS: "_dd.p.tid=e987c84b36b11ab",
646646
},
647647
)
648648
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
@@ -651,9 +651,11 @@ def test_step_function_trace_data(self):
651651
expected_context,
652652
)
653653

654-
def test_is_legacy_lambda_step_function(self):
655-
sf_event = {
656-
"Payload": {
654+
@with_trace_propagation_style("datadog")
655+
def test_step_function_trace_data_lambda_root(self):
656+
lambda_ctx = get_mock_context()
657+
sfn_event = {
658+
"_datadog": {
657659
"Execution": {
658660
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
659661
},
@@ -662,24 +664,75 @@ def test_is_legacy_lambda_step_function(self):
662664
"Name": "my-awesome-state",
663665
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
664666
},
667+
"x-datadog-trace-id": "5821803790426892636",
668+
"x-datadog-tags": "_dd.p.dm=-0,_dd.p.tid=672a7cb100000000",
669+
"serverless-version": "v1",
665670
}
666671
}
667-
self.assertTrue(is_legacy_lambda_step_function(sf_event))
668-
669-
sf_event = {
670-
"Execution": {
671-
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
672-
},
673-
"StateMachine": {},
674-
"State": {
675-
"Name": "my-awesome-state",
676-
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
672+
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
673+
self.assertEqual(source, "event")
674+
expected_context = Context(
675+
trace_id=5821803790426892636,
676+
span_id=6880978411788117524,
677+
sampling_priority=1,
678+
meta={"_dd.p.tid": "672a7cb100000000"},
679+
)
680+
self.assertEqual(ctx, expected_context)
681+
self.assertEqual(
682+
get_dd_trace_context(),
683+
{
684+
TraceHeader.TRACE_ID: "5821803790426892636",
685+
TraceHeader.PARENT_ID: "10713633173203262661",
686+
TraceHeader.SAMPLING_PRIORITY: "1",
687+
TraceHeader.TAGS: "_dd.p.tid=672a7cb100000000",
677688
},
678-
}
679-
self.assertFalse(is_legacy_lambda_step_function(sf_event))
689+
)
690+
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
691+
self.mock_send_segment.assert_called_with(
692+
XraySubsegment.TRACE_KEY,
693+
expected_context,
694+
)
680695

681-
other_event = ["foo", "bar"]
682-
self.assertFalse(is_legacy_lambda_step_function(other_event))
696+
@with_trace_propagation_style("datadog")
697+
def test_step_function_trace_data_sfn_root(self):
698+
lambda_ctx = get_mock_context()
699+
sfn_event = {
700+
"_datadog": {
701+
"Execution": {
702+
"Id": "665c417c-1237-4742-aaca-8b3becbb9e75",
703+
},
704+
"StateMachine": {},
705+
"State": {
706+
"Name": "my-awesome-state",
707+
"EnteredTime": "Mon Nov 13 12:43:33 PST 2023",
708+
},
709+
"RootExecutionId": "4875aba4-ae31-4a4c-bf8a-63e9eee31dad",
710+
"serverless-version": "v1",
711+
}
712+
}
713+
ctx, source, event_source = extract_dd_trace_context(sfn_event, lambda_ctx)
714+
self.assertEqual(source, "event")
715+
expected_context = Context(
716+
trace_id=4521899030418994483,
717+
span_id=6880978411788117524,
718+
sampling_priority=1,
719+
meta={"_dd.p.tid": "12d1270d99cc5e03"},
720+
)
721+
self.assertEqual(ctx, expected_context)
722+
self.assertEqual(
723+
get_dd_trace_context(),
724+
{
725+
TraceHeader.TRACE_ID: "4521899030418994483",
726+
TraceHeader.PARENT_ID: "10713633173203262661",
727+
TraceHeader.SAMPLING_PRIORITY: "1",
728+
TraceHeader.TAGS: "_dd.p.tid=12d1270d99cc5e03",
729+
},
730+
)
731+
create_dd_dummy_metadata_subsegment(ctx, XraySubsegment.TRACE_KEY)
732+
self.mock_send_segment.assert_called_with(
733+
XraySubsegment.TRACE_KEY,
734+
expected_context,
735+
)
683736

684737

685738
class TestXRayContextConversion(unittest.TestCase):

tests/test_trigger.py

+29
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,20 @@ def test_event_source_sqs(self):
230230
"arn:aws:sqs:eu-west-1:601427279990:InferredSpansQueueNode",
231231
)
232232

233+
def test_event_source_stepfunctions(self):
234+
event_sample_source = "states"
235+
test_file = event_samples + event_sample_source + ".json"
236+
with open(test_file, "r") as event:
237+
event = json.load(event)
238+
ctx = get_mock_context()
239+
event_source = parse_event_source(event)
240+
event_source_arn = get_event_source_arn(event_source, event, ctx)
241+
self.assertEqual(event_source.to_string(), event_sample_source)
242+
self.assertEqual(
243+
event_source_arn,
244+
"arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
245+
)
246+
233247
def test_event_source_unsupported(self):
234248
event_sample_source = "custom"
235249
test_file = event_samples + event_sample_source + ".json"
@@ -485,6 +499,21 @@ def test_extract_trigger_tags_sqs(self):
485499
},
486500
)
487501

502+
def test_extract_trigger_tags_stepfunctions(self):
503+
event_sample_source = "states"
504+
test_file = event_samples + event_sample_source + ".json"
505+
ctx = get_mock_context()
506+
with open(test_file, "r") as event:
507+
event = json.load(event)
508+
tags = extract_trigger_tags(event, ctx)
509+
self.assertEqual(
510+
tags,
511+
{
512+
"function_trigger.event_source": "states",
513+
"function_trigger.event_source_arn": "arn:aws:states:ca-central-1:425362996713:stateMachine:MyStateMachine-wsx8chv4d",
514+
},
515+
)
516+
488517
def test_extract_trigger_tags_unsupported(self):
489518
event_sample_source = "custom"
490519
test_file = event_samples + event_sample_source + ".json"

0 commit comments

Comments
 (0)