From 0f7fb937a7447a57488495fcb2197c7edd681122 Mon Sep 17 00:00:00 2001 From: Rey Abolofia Date: Tue, 9 Apr 2024 10:48:03 -0700 Subject: [PATCH 1/2] Use single socket for all xray api calls. --- datadog_lambda/xray.py | 79 ++++++++++++++++++++++++---------------- tests/test_benchmarks.py | 10 +++++ tests/test_wrapper.py | 7 +++- tests/test_xray.py | 20 +++++----- 4 files changed, 72 insertions(+), 44 deletions(-) diff --git a/datadog_lambda/xray.py b/datadog_lambda/xray.py index e6aff0bc..8c68bf56 100644 --- a/datadog_lambda/xray.py +++ b/datadog_lambda/xray.py @@ -10,32 +10,52 @@ logger = logging.getLogger(__name__) -def get_xray_host_port(address): - if address == "": - logger.debug("X-Ray daemon env var not set, not sending sub-segment") - return None - parts = address.split(":") - if len(parts) <= 1: - logger.debug("X-Ray daemon env var not set, not sending sub-segment") - return None - port = int(parts[1]) - host = parts[0] - return (host, port) - - -def send(host_port_tuple, payload): - sock = None - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setblocking(0) - sock.connect(host_port_tuple) - sock.send(payload.encode("utf-8")) - except Exception as e_send: - logger.error("Error occurred submitting to xray daemon: %s", e_send) - try: - sock.close() - except Exception as e_close: - logger.error("Error while closing the socket: %s", e_close) +class Socket(object): + def __init__(self): + self.sock = None + + @property + def host_port_tuple(self): + if not hasattr(self, "_host_port_tuple"): + self._host_port_tuple = self._get_xray_host_port( + os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "") + ) + return self._host_port_tuple + + def send(self, payload): + if not self.sock: + self._connect() + try: + self.sock.send(payload.encode("utf-8")) + except Exception as e_send: + logger.error("Error occurred submitting to xray daemon: %s", e_send) + + def reset(self): + if hasattr(self, "_host_port_tuple"): + del self._host_port_tuple + if self.sock: + self.sock.close() + self.sock = None + + def _connect(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self.sock.setblocking(0) + self.sock.connect(self.host_port_tuple) + + def _get_xray_host_port(self, address): + if address == "": + logger.debug("X-Ray daemon env var not set, not sending sub-segment") + return None + parts = address.split(":") + if len(parts) <= 1: + logger.debug("X-Ray daemon env var not set, not sending sub-segment") + return None + port = int(parts[1]) + host = parts[0] + return (host, port) + + +sock = Socket() def build_segment_payload(payload): @@ -95,10 +115,7 @@ def build_segment(context, key, metadata): def send_segment(key, metadata): - host_port_tuple = get_xray_host_port( - os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "") - ) - if host_port_tuple is None: + if sock.host_port_tuple is None: return None context = parse_xray_header( os.environ.get(XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "") @@ -115,4 +132,4 @@ def send_segment(key, metadata): return None segment = build_segment(context, key, metadata) segment_payload = build_segment_payload(segment) - send(host_port_tuple, segment_payload) + sock.send(segment_payload) diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index d430c255..a36f9a73 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -74,14 +74,24 @@ def test_trigger_extract_trigger_tags(event, benchmark): def test_xray_send_segment(benchmark, monkeypatch): + xray.sock.reset() + monkeypatch.setenv(XrayDaemon.XRAY_DAEMON_ADDRESS, "localhost:9000") monkeypatch.setenv( XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1;Lineage=c6c5b1b9:0", ) + + def socket_send(*a, **k): + sends.append(True) + + sends = [] + monkeypatch.setattr("socket.socket.send", socket_send) + key = { "trace-id": "12345678901234567890123456789012", "parent-id": "1234567890123456", "sampling-priority": "1", } benchmark(xray.send_segment, XraySubsegment.TRACE_KEY, key) + assert sends diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index b7058e11..76facd52 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -7,6 +7,7 @@ from datadog_lambda.constants import TraceHeader import datadog_lambda.wrapper as wrapper +import datadog_lambda.xray as xray from datadog_lambda.metric import lambda_metric from datadog_lambda.thread_stats_writer import ThreadStatsWriter from ddtrace import Span, tracer @@ -590,7 +591,9 @@ class TestLambdaWrapperWithTraceContext(unittest.TestCase): }, ) def test_event_bridge_sqs_payload(self): - patcher = patch("datadog_lambda.xray.send") + xray.sock.reset() + + patcher = patch("datadog_lambda.xray.sock.send") mock_send = patcher.start() self.addCleanup(patcher.stop) @@ -623,7 +626,7 @@ def handler(event, context): self.assertEqual(result.span_id, aws_lambda_span.span_id) self.assertEqual(result.sampling_priority, 1) mock_send.assert_called_once() - (_, raw_payload), _ = mock_send.call_args + (raw_payload,), _ = mock_send.call_args payload = json.loads(raw_payload[33:]) # strip formatting prefix self.assertEqual(self.xray_root, payload["trace_id"]) self.assertEqual(self.xray_parent, payload["parent_id"]) diff --git a/tests/test_xray.py b/tests/test_xray.py index ac3594a9..5d5c487e 100644 --- a/tests/test_xray.py +++ b/tests/test_xray.py @@ -4,15 +4,13 @@ from unittest.mock import MagicMock, patch -from datadog_lambda.xray import ( - get_xray_host_port, - build_segment_payload, - build_segment, - send_segment, -) +from datadog_lambda.xray import build_segment_payload, build_segment, send_segment, sock class TestXRay(unittest.TestCase): + def setUp(self): + sock.reset() + def tearDown(self): if os.environ.get("_X_AMZN_TRACE_ID"): os.environ.pop("_X_AMZN_TRACE_ID") @@ -21,15 +19,15 @@ def tearDown(self): return super().tearDown() def test_get_xray_host_port_empty_(self): - result = get_xray_host_port("") + result = sock._get_xray_host_port("") self.assertIsNone(result) def test_get_xray_host_port_invalid_value(self): - result = get_xray_host_port("myVar") + result = sock._get_xray_host_port("myVar") self.assertIsNone(result) def test_get_xray_host_port_success(self): - result = get_xray_host_port("mySuperHost:1000") + result = sock._get_xray_host_port("mySuperHost:1000") self.assertEqual("mySuperHost", result[0]) self.assertEqual(1000, result[1]) @@ -40,7 +38,7 @@ def test_send_segment_sampled_out(self): ] = "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=0;Lineage=c6c5b1b9:0" with patch( - "datadog_lambda.xray.send", MagicMock(return_value=None) + "datadog_lambda.xray.sock.send", MagicMock(return_value=None) ) as mock_send: # XRay trace won't be sampled according to the trace header. send_segment("my_key", {"data": "value"}) @@ -52,7 +50,7 @@ def test_send_segment_sampled(self): "_X_AMZN_TRACE_ID" ] = "Root=1-5e272390-8c398be037738dc042009320;Parent=94ae789b969f1cc5;Sampled=1;Lineage=c6c5b1b9:0" with patch( - "datadog_lambda.xray.send", MagicMock(return_value=None) + "datadog_lambda.xray.sock.send", MagicMock(return_value=None) ) as mock_send: # X-Ray trace will be sampled according to the trace header. send_segment("my_key", {"data": "value"}) From 7935b65e63c6d1f470bca3c0b2b7fdd95bb1351f Mon Sep 17 00:00:00 2001 From: Rey Abolofia Date: Tue, 9 Apr 2024 12:29:15 -0700 Subject: [PATCH 2/2] Move reset to tests. --- datadog_lambda/xray.py | 7 ------- tests/test_benchmarks.py | 4 ++-- tests/test_wrapper.py | 4 ++-- tests/test_xray.py | 3 ++- tests/utils.py | 10 ++++++++++ 5 files changed, 16 insertions(+), 12 deletions(-) diff --git a/datadog_lambda/xray.py b/datadog_lambda/xray.py index 8c68bf56..c4fa1643 100644 --- a/datadog_lambda/xray.py +++ b/datadog_lambda/xray.py @@ -30,13 +30,6 @@ def send(self, payload): except Exception as e_send: logger.error("Error occurred submitting to xray daemon: %s", e_send) - def reset(self): - if hasattr(self, "_host_port_tuple"): - del self._host_port_tuple - if self.sock: - self.sock.close() - self.sock = None - def _connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.setblocking(0) diff --git a/tests/test_benchmarks.py b/tests/test_benchmarks.py index a36f9a73..899e3d7f 100644 --- a/tests/test_benchmarks.py +++ b/tests/test_benchmarks.py @@ -13,7 +13,7 @@ from datadog_lambda.constants import XrayDaemon, XraySubsegment -from tests.utils import get_mock_context +from tests.utils import get_mock_context, reset_xray_connection event_samples_dir = "tests/event_samples" @@ -74,7 +74,7 @@ def test_trigger_extract_trigger_tags(event, benchmark): def test_xray_send_segment(benchmark, monkeypatch): - xray.sock.reset() + reset_xray_connection() monkeypatch.setenv(XrayDaemon.XRAY_DAEMON_ADDRESS, "localhost:9000") monkeypatch.setenv( diff --git a/tests/test_wrapper.py b/tests/test_wrapper.py index 76facd52..38c0d9fc 100644 --- a/tests/test_wrapper.py +++ b/tests/test_wrapper.py @@ -13,7 +13,7 @@ from ddtrace import Span, tracer from ddtrace.internal.constants import MAX_UINT_64BITS -from tests.utils import get_mock_context +from tests.utils import get_mock_context, reset_xray_connection class TestDatadogLambdaWrapper(unittest.TestCase): @@ -591,7 +591,7 @@ class TestLambdaWrapperWithTraceContext(unittest.TestCase): }, ) def test_event_bridge_sqs_payload(self): - xray.sock.reset() + reset_xray_connection() patcher = patch("datadog_lambda.xray.sock.send") mock_send = patcher.start() diff --git a/tests/test_xray.py b/tests/test_xray.py index 5d5c487e..7f33f891 100644 --- a/tests/test_xray.py +++ b/tests/test_xray.py @@ -5,11 +5,12 @@ from unittest.mock import MagicMock, patch from datadog_lambda.xray import build_segment_payload, build_segment, send_segment, sock +from tests.utils import reset_xray_connection class TestXRay(unittest.TestCase): def setUp(self): - sock.reset() + reset_xray_connection() def tearDown(self): if os.environ.get("_X_AMZN_TRACE_ID"): diff --git a/tests/utils.py b/tests/utils.py index 71060d46..0f246e68 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -24,3 +24,13 @@ def get_mock_context( lambda_context.function_name = function_name lambda_context.client_context = ClientContext(custom) return lambda_context + + +def reset_xray_connection(): + from datadog_lambda.xray import sock + + if hasattr(sock, "_host_port_tuple"): + del sock._host_port_tuple + if sock.sock: + sock.sock.close() + sock.sock = None