-
Notifications
You must be signed in to change notification settings - Fork 45
Use single socket instance for all xray api calls. #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,32 +10,52 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
def get_xray_host_port(address): | ||
if address == "": | ||
logger.debug("X-Ray daemon env var not set, not sending sub-segment") | ||
return None | ||
parts = address.split(":") | ||
if len(parts) <= 1: | ||
logger.debug("X-Ray daemon env var not set, not sending sub-segment") | ||
return None | ||
port = int(parts[1]) | ||
host = parts[0] | ||
return (host, port) | ||
|
||
|
||
def send(host_port_tuple, payload): | ||
sock = None | ||
try: | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.setblocking(0) | ||
sock.connect(host_port_tuple) | ||
sock.send(payload.encode("utf-8")) | ||
except Exception as e_send: | ||
logger.error("Error occurred submitting to xray daemon: %s", e_send) | ||
try: | ||
sock.close() | ||
except Exception as e_close: | ||
logger.error("Error while closing the socket: %s", e_close) | ||
class Socket(object): | ||
def __init__(self): | ||
self.sock = None | ||
|
||
@property | ||
def host_port_tuple(self): | ||
if not hasattr(self, "_host_port_tuple"): | ||
self._host_port_tuple = self._get_xray_host_port( | ||
os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "") | ||
) | ||
return self._host_port_tuple | ||
|
||
def send(self, payload): | ||
if not self.sock: | ||
self._connect() | ||
try: | ||
self.sock.send(payload.encode("utf-8")) | ||
except Exception as e_send: | ||
logger.error("Error occurred submitting to xray daemon: %s", e_send) | ||
|
||
def reset(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: can we move it to tests files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, good call. Done. |
||
if hasattr(self, "_host_port_tuple"): | ||
del self._host_port_tuple | ||
if self.sock: | ||
self.sock.close() | ||
self.sock = None | ||
|
||
def _connect(self): | ||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
self.sock.setblocking(0) | ||
self.sock.connect(self.host_port_tuple) | ||
|
||
def _get_xray_host_port(self, address): | ||
if address == "": | ||
logger.debug("X-Ray daemon env var not set, not sending sub-segment") | ||
return None | ||
parts = address.split(":") | ||
if len(parts) <= 1: | ||
logger.debug("X-Ray daemon env var not set, not sending sub-segment") | ||
return None | ||
port = int(parts[1]) | ||
host = parts[0] | ||
return (host, port) | ||
|
||
|
||
sock = Socket() | ||
|
||
|
||
def build_segment_payload(payload): | ||
|
@@ -95,10 +115,7 @@ def build_segment(context, key, metadata): | |
|
||
|
||
def send_segment(key, metadata): | ||
host_port_tuple = get_xray_host_port( | ||
os.environ.get(XrayDaemon.XRAY_DAEMON_ADDRESS, "") | ||
) | ||
if host_port_tuple is None: | ||
if sock.host_port_tuple is None: | ||
return None | ||
context = parse_xray_header( | ||
os.environ.get(XrayDaemon.XRAY_TRACE_ID_HEADER_NAME, "") | ||
|
@@ -115,4 +132,4 @@ def send_segment(key, metadata): | |
return None | ||
segment = build_segment(context, key, metadata) | ||
segment_payload = build_segment_payload(segment) | ||
send(host_port_tuple, segment_payload) | ||
sock.send(segment_payload) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
from datadog_lambda.constants import TraceHeader | ||
|
||
import datadog_lambda.wrapper as wrapper | ||
import datadog_lambda.xray as xray | ||
from datadog_lambda.metric import lambda_metric | ||
from datadog_lambda.thread_stats_writer import ThreadStatsWriter | ||
from ddtrace import Span, tracer | ||
|
@@ -590,7 +591,9 @@ class TestLambdaWrapperWithTraceContext(unittest.TestCase): | |
}, | ||
) | ||
def test_event_bridge_sqs_payload(self): | ||
patcher = patch("datadog_lambda.xray.send") | ||
xray.sock.reset() | ||
|
||
patcher = patch("datadog_lambda.xray.sock.send") | ||
mock_send = patcher.start() | ||
self.addCleanup(patcher.stop) | ||
|
||
|
@@ -623,7 +626,7 @@ def handler(event, context): | |
self.assertEqual(result.span_id, aws_lambda_span.span_id) | ||
self.assertEqual(result.sampling_priority, 1) | ||
mock_send.assert_called_once() | ||
(_, raw_payload), _ = mock_send.call_args | ||
(raw_payload,), _ = mock_send.call_args | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Signature of the |
||
payload = json.loads(raw_payload[33:]) # strip formatting prefix | ||
self.assertEqual(self.xray_root, payload["trace_id"]) | ||
self.assertEqual(self.xray_parent, payload["parent_id"]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This value is now memoized so we only need to parse the env var once.