Skip to content

Commit 3414cfb

Browse files
authored
Use single socket instance for all xray api calls. (#467)
* Use single socket for all xray api calls. * Move reset to tests.
1 parent 16f0e23 commit 3414cfb

File tree

5 files changed

+78
-46
lines changed

5 files changed

+78
-46
lines changed

datadog_lambda/xray.py

+41-31
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,45 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13-
def get_xray_host_port(address):
14-
if address == "":
15-
logger.debug("X-Ray daemon env var not set, not sending sub-segment")
16-
return None
17-
parts = address.split(":")
18-
if len(parts) <= 1:
19-
logger.debug("X-Ray daemon env var not set, not sending sub-segment")
20-
return None
21-
port = int(parts[1])
22-
host = parts[0]
23-
return (host, port)
24-
25-
26-
def send(host_port_tuple, payload):
27-
sock = None
28-
try:
29-
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
30-
sock.setblocking(0)
31-
sock.connect(host_port_tuple)
32-
sock.send(payload.encode("utf-8"))
33-
except Exception as e_send:
34-
logger.error("Error occurred submitting to xray daemon: %s", e_send)
35-
try:
36-
sock.close()
37-
except Exception as e_close:
38-
logger.error("Error while closing the socket: %s", e_close)
13+
class Socket(object):
14+
def __init__(self):
15+
self.sock = None
16+
17+
@property
18+
def host_port_tuple(self):
19+
if not hasattr(self, "_host_port_tuple"):
20+
self._host_port_tuple = self._get_xray_host_port(
21+
os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "")
22+
)
23+
return self._host_port_tuple
24+
25+
def send(self, payload):
26+
if not self.sock:
27+
self._connect()
28+
try:
29+
self.sock.send(payload.encode("utf-8"))
30+
except Exception as e_send:
31+
logger.error("Error occurred submitting to xray daemon: %s", e_send)
32+
33+
def _connect(self):
34+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
35+
self.sock.setblocking(0)
36+
self.sock.connect(self.host_port_tuple)
37+
38+
def _get_xray_host_port(self, address):
39+
if address == "":
40+
logger.debug("X-Ray daemon env var not set, not sending sub-segment")
41+
return None
42+
parts = address.split(":")
43+
if len(parts) <= 1:
44+
logger.debug("X-Ray daemon env var not set, not sending sub-segment")
45+
return None
46+
port = int(parts[1])
47+
host = parts[0]
48+
return (host, port)
49+
50+
51+
sock = Socket()
3952

4053

4154
def build_segment_payload(payload):
@@ -95,10 +108,7 @@ def build_segment(context, key, metadata):
95108

96109

97110
def send_segment(key, metadata):
98-
host_port_tuple = get_xray_host_port(
99-
os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "")
100-
)
101-
if host_port_tuple is None:
111+
if sock.host_port_tuple is None:
102112
return None
103113
context = parse_xray_header(
104114
os.environ.get(XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "")
@@ -115,4 +125,4 @@ def send_segment(key, metadata):
115125
return None
116126
segment = build_segment(context, key, metadata)
117127
segment_payload = build_segment_payload(segment)
118-
send(host_port_tuple, segment_payload)
128+
sock.send(segment_payload)

tests/test_benchmarks.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from datadog_lambda.constants import XrayDaemon, XraySubsegment
1515

16-
from tests.utils import get_mock_context
16+
from tests.utils import get_mock_context, reset_xray_connection
1717

1818

1919
event_samples_dir = "tests/event_samples"
@@ -74,14 +74,24 @@ def test_trigger_extract_trigger_tags(event, benchmark):
7474

7575

7676
def test_xray_send_segment(benchmark, monkeypatch):
77+
reset_xray_connection()
78+
7779
monkeypatch.setenv(XrayDaemon.XRAY_DAEMON_ADDRESS, "localhost:9000")
7880
monkeypatch.setenv(
7981
XrayDaemon.XRAY_TRACE_ID_HEADER_NAME,
8082
"Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1;Lineage=c6c5b1b9:0",
8183
)
84+
85+
def socket_send(*a, **k):
86+
sends.append(True)
87+
88+
sends = []
89+
monkeypatch.setattr("socket.socket.send", socket_send)
90+
8291
key = {
8392
"trace-id": "12345678901234567890123456789012",
8493
"parent-id": "1234567890123456",
8594
"sampling-priority": "1",
8695
}
8796
benchmark(xray.send_segment, XraySubsegment.TRACE_KEY, key)
97+
assert sends

tests/test_wrapper.py

+6-3
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
from datadog_lambda.constants import TraceHeader
88

99
import datadog_lambda.wrapper as wrapper
10+
import datadog_lambda.xray as xray
1011
from datadog_lambda.metric import lambda_metric
1112
from datadog_lambda.thread_stats_writer import ThreadStatsWriter
1213
from ddtrace import Span, tracer
1314
from ddtrace.internal.constants import MAX_UINT_64BITS
1415

15-
from tests.utils import get_mock_context
16+
from tests.utils import get_mock_context, reset_xray_connection
1617

1718

1819
class TestDatadogLambdaWrapper(unittest.TestCase):
@@ -590,7 +591,9 @@ class TestLambdaWrapperWithTraceContext(unittest.TestCase):
590591
},
591592
)
592593
def test_event_bridge_sqs_payload(self):
593-
patcher = patch("datadog_lambda.xray.send")
594+
reset_xray_connection()
595+
596+
patcher = patch("datadog_lambda.xray.sock.send")
594597
mock_send = patcher.start()
595598
self.addCleanup(patcher.stop)
596599

@@ -623,7 +626,7 @@ def handler(event, context):
623626
self.assertEqual(result.span_id, aws_lambda_span.span_id)
624627
self.assertEqual(result.sampling_priority, 1)
625628
mock_send.assert_called_once()
626-
(_, raw_payload), _ = mock_send.call_args
629+
(raw_payload,), _ = mock_send.call_args
627630
payload = json.loads(raw_payload[33:]) # strip formatting prefix
628631
self.assertEqual(self.xray_root, payload["trace_id"])
629632
self.assertEqual(self.xray_parent, payload["parent_id"])

tests/test_xray.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,14 @@
44

55
from unittest.mock import MagicMock, patch
66

7-
from datadog_lambda.xray import (
8-
get_xray_host_port,
9-
build_segment_payload,
10-
build_segment,
11-
send_segment,
12-
)
7+
from datadog_lambda.xray import build_segment_payload, build_segment, send_segment, sock
8+
from tests.utils import reset_xray_connection
139

1410

1511
class TestXRay(unittest.TestCase):
12+
def setUp(self):
13+
reset_xray_connection()
14+
1615
def tearDown(self):
1716
if os.environ.get("_X_AMZN_TRACE_ID"):
1817
os.environ.pop("_X_AMZN_TRACE_ID")
@@ -21,15 +20,15 @@ def tearDown(self):
2120
return super().tearDown()
2221

2322
def test_get_xray_host_port_empty_(self):
24-
result = get_xray_host_port("")
23+
result = sock._get_xray_host_port("")
2524
self.assertIsNone(result)
2625

2726
def test_get_xray_host_port_invalid_value(self):
28-
result = get_xray_host_port("myVar")
27+
result = sock._get_xray_host_port("myVar")
2928
self.assertIsNone(result)
3029

3130
def test_get_xray_host_port_success(self):
32-
result = get_xray_host_port("mySuperHost:1000")
31+
result = sock._get_xray_host_port("mySuperHost:1000")
3332
self.assertEqual("mySuperHost", result[0])
3433
self.assertEqual(1000, result[1])
3534

@@ -40,7 +39,7 @@ def test_send_segment_sampled_out(self):
4039
] = "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=0;Lineage=c6c5b1b9:0"
4140

4241
with patch(
43-
"datadog_lambda.xray.send", MagicMock(return_value=None)
42+
"datadog_lambda.xray.sock.send", MagicMock(return_value=None)
4443
) as mock_send:
4544
# XRay trace won't be sampled according to the trace header.
4645
send_segment("my_key", {"data": "value"})
@@ -52,7 +51,7 @@ def test_send_segment_sampled(self):
5251
"_X_AMZN_TRACE_ID"
5352
] = "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1;Lineage=c6c5b1b9:0"
5453
with patch(
55-
"datadog_lambda.xray.send", MagicMock(return_value=None)
54+
"datadog_lambda.xray.sock.send", MagicMock(return_value=None)
5655
) as mock_send:
5756
# X-Ray trace will be sampled according to the trace header.
5857
send_segment("my_key", {"data": "value"})

tests/utils.py

+10
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,13 @@ def get_mock_context(
2424
lambda_context.function_name = function_name
2525
lambda_context.client_context = ClientContext(custom)
2626
return lambda_context
27+
28+
29+
def reset_xray_connection():
30+
from datadog_lambda.xray import sock
31+
32+
if hasattr(sock, "_host_port_tuple"):
33+
del sock._host_port_tuple
34+
if sock.sock:
35+
sock.sock.close()
36+
sock.sock = None

0 commit comments

Comments
 (0)