Skip to content

Commit 2d9dbac

Browse files
committed
Initial change on centralized sampling support
add poller logic cs support beta cs minor bug fixes minor fixes minor fixes minor fixes update change log release commit
1 parent e127e6e commit 2d9dbac

37 files changed

+1369
-212
lines changed

CHANGELOG.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@
22
CHANGELOG
33
=========
44

5+
2.0.0
6+
=====
7+
* **Breaking**: The default sampler now launches background tasks to poll sampling rules from X-Ray backend. See the new default sampling strategy in more details here:
8+
* **Breaking**: The `should_trace` function in the sampler now takes a dictionary for sampling rule matching.
9+
* **Breaking**: The original sampling modules for local defined rules are moved from `models.sampling` to `models.sampling.local`.
10+
* **Breaking**: The default behavior of `patch_all` changed to selectively patches libraries to avoid double patching. You can use `patch_all(double_patch=True)` to force it to patch ALL supported libraries. See more details on `ISSUE63 <https://github.com/aws/aws-xray-sdk-python/issues/63>`_
11+
* **Breaking** The latest `botocore` that has new X-Ray service API `GetSamplingRules` and `GetSamplingTargets` are required.
12+
* feature: Environment variable `AWS_XRAY_DAEMON_ADDRESS` now takes an additional notation in `tcp:127.0.0.1:2000 udp:127.0.0.2:2001` to set TCP and UDP destination separately. By default it assumes a X-Ray daemon listening to both UDP and TCP traffic on `127.0.0.1:2000`.
13+
* feature: Added MongoDB python client support. `PR65 <https://github.com/aws/aws-xray-sdk-python/pull/65>`_.
14+
* bugfix: Support binding connection in sqlalchemy as well as engine. `PR78 <https://github.com/aws/aws-xray-sdk-python/pull/78>`_.
15+
* bugfix: Flask middleware safe request teardown. `ISSUE75 <https://github.com/aws/aws-xray-sdk-python/issues/75>`_.
16+
17+
518
1.1.2
619
=====
720
* bugfix: Fixed an issue on PynamoDB patcher where the capture didn't handle client timeout.

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
include aws_xray_sdk/ext/resources/*.json
2-
include aws_xray_sdk/core/sampling/*.json
2+
include aws_xray_sdk/core/sampling/local/*.json
33
include README.md
44
include LICENSE

aws_xray_sdk/core/daemon_config.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import os
2+
3+
from .exceptions.exceptions import InvalidDaemonAddressException
4+
5+
DAEMON_ADDRESS_KEY = "AWS_XRAY_DAEMON_ADDRESS"
6+
DEFAULT_ADDRESS = '127.0.0.1:2000'
7+
8+
9+
class DaemonConfig(object):
10+
"""The class that stores X-Ray daemon configuration about
11+
the ip address and port for UDP and TCP port. It gets the address
12+
string from ``AWS_TRACING_DAEMON_ADDRESS`` and then from recorder's
13+
configuration for ``daemon_address``.
14+
A notation of '127.0.0.1:2000' or 'tcp:127.0.0.1:2000 udp:127.0.0.2:2001'
15+
are both acceptable. The former one means UDP and TCP are running at
16+
the same address.
17+
By default it assumes a X-Ray daemon running at 127.0.0.1:2000
18+
listening to both UDP and TCP traffic.
19+
"""
20+
def __init__(self, daemon_address=DEFAULT_ADDRESS):
21+
if daemon_address is None:
22+
daemon_address = DEFAULT_ADDRESS
23+
24+
val = os.getenv(DAEMON_ADDRESS_KEY, daemon_address)
25+
configs = val.split(' ')
26+
if len(configs) == 1:
27+
self._parse_single_form(configs[0])
28+
elif len(configs) == 2:
29+
self._parse_double_form(configs[0], configs[1], val)
30+
else:
31+
raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % val)
32+
33+
def _parse_single_form(self, val):
34+
try:
35+
configs = val.split(':')
36+
self._udp_ip = configs[0]
37+
self._udp_port = int(configs[1])
38+
self._tcp_ip = configs[0]
39+
self._tcp_port = int(configs[1])
40+
except Exception:
41+
raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % val)
42+
43+
def _parse_double_form(self, val1, val2, origin):
44+
try:
45+
configs1 = val1.split(':')
46+
configs2 = val2.split(':')
47+
mapping = {
48+
configs1[0]: configs1,
49+
configs2[0]: configs2,
50+
}
51+
52+
tcp_info = mapping.get('tcp')
53+
udp_info = mapping.get('udp')
54+
55+
self._tcp_ip = tcp_info[1]
56+
self._tcp_port = int(tcp_info[2])
57+
self._udp_ip = udp_info[1]
58+
self._udp_port = int(udp_info[2])
59+
except Exception:
60+
raise InvalidDaemonAddressException('Invalid daemon address %s specified.' % origin)
61+
62+
@property
63+
def udp_ip(self):
64+
return self._udp_ip
65+
66+
@property
67+
def udp_port(self):
68+
return self._udp_port
69+
70+
@property
71+
def tcp_ip(self):
72+
return self._tcp_ip
73+
74+
@property
75+
def tcp_port(self):
76+
return self._tcp_port

aws_xray_sdk/core/emitters/udp_emitter.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import os
2-
import socket
31
import logging
2+
import socket
43

4+
from aws_xray_sdk.core.daemon_config import DaemonConfig
55
from ..exceptions.exceptions import InvalidDaemonAddressException
66

77
log = logging.getLogger(__name__)
88

99

10-
PROTOCOL_HEADER = "{\"format\": \"json\", \"version\": 1}"
10+
PROTOCOL_HEADER = "{\"format\":\"json\",\"version\":1}"
1111
PROTOCOL_DELIMITER = '\n'
12-
DAEMON_ADDRESS_KEY = "AWS_XRAY_DAEMON_ADDRESS"
12+
DEFAULT_DAEMON_ADDRESS = '127.0.0.1:2000'
1313

1414

1515
class UDPEmitter(object):
@@ -19,12 +19,11 @@ class UDPEmitter(object):
1919
exception on the actual data transfer between the socket and the daemon,
2020
it logs the exception and continue.
2121
"""
22-
def __init__(self, daemon_address='127.0.0.1:2000'):
22+
def __init__(self, daemon_address=DEFAULT_DAEMON_ADDRESS):
2323

2424
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
2525
self._socket.setblocking(0)
26-
address = os.getenv(DAEMON_ADDRESS_KEY, daemon_address)
27-
self._ip, self._port = self._parse_address(address)
26+
self.set_daemon_address(daemon_address)
2827

2928
def send_entity(self, entity):
3029
"""
@@ -42,11 +41,20 @@ def send_entity(self, entity):
4241

4342
def set_daemon_address(self, address):
4443
"""
45-
Takes a full address like 127.0.0.1:2000 and parses it into ip address
46-
and port. Throws an exception if the address has invalid format.
44+
Set up UDP ip and port from the raw daemon address
45+
string using ``DaemonConfig`` class utlities.
4746
"""
4847
if address:
49-
self._ip, self._port = self._parse_address(address)
48+
daemon_config = DaemonConfig(address)
49+
self._ip, self._port = daemon_config.udp_ip, daemon_config.udp_port
50+
51+
@property
52+
def ip(self):
53+
return self._ip
54+
55+
@property
56+
def port(self):
57+
return self._port
5058

5159
def _send_data(self, data):
5260

aws_xray_sdk/core/models/segment.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ def set_service(self, service_info):
113113
"""
114114
self.service = service_info
115115

116+
def set_rule_name(self, rule_name):
117+
"""
118+
Add the matched centralized sampling rule name
119+
if a segment is sampled because of that rule.
120+
This method should be only used by the recorder.
121+
"""
122+
if not self.aws.get('xray', None):
123+
self.aws['xray'] = {}
124+
self.aws['xray']['rule_name'] = rule_name
125+
116126
def save_origin_trace_header(self, trace_header):
117127
"""
118128
Temporarily store additional data fields in trace header

aws_xray_sdk/core/patcher.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,24 @@
1414
'pymongo',
1515
)
1616

17+
NO_DOUBLE_PATCH = (
18+
'aiobotocore',
19+
'botocore',
20+
'pynamodb',
21+
'requests',
22+
'sqlite3',
23+
'mysql',
24+
'pymongo',
25+
)
26+
1727
_PATCHED_MODULES = set()
1828

1929

20-
def patch_all():
21-
patch(SUPPORTED_MODULES, raise_errors=False)
30+
def patch_all(double_patch=False):
31+
if double_patch:
32+
patch(SUPPORTED_MODULES, raise_errors=False)
33+
else:
34+
patch(NO_DOUBLE_PATCH, raise_errors=False)
2235

2336

2437
def patch(modules_to_patch, raise_errors=True):

aws_xray_sdk/core/recorder.py

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
from .models.default_dynamic_naming import DefaultDynamicNaming
1515
from .models.dummy_entities import DummySegment, DummySubsegment
1616
from .emitters.udp_emitter import UDPEmitter
17-
from .sampling.default_sampler import DefaultSampler
17+
from .sampling.sampler import DefaultSampler
18+
from .sampling.local.sampler import LocalSampler
1819
from .streaming.default_streaming import DefaultStreaming
1920
from .context import Context
21+
from .daemon_config import DaemonConfig
2022
from .plugins.utils import get_plugin_modules
2123
from .lambda_launcher import check_in_lambda
2224
from .exceptions.exceptions import SegmentNameMissingException
@@ -59,11 +61,12 @@ def __init__(self):
5961
# Special handling when running on AWS Lambda.
6062
self._context = context
6163
self.streaming_threshold = 0
64+
self._sampler = LocalSampler()
6265
else:
6366
self._context = Context()
67+
self._sampler = DefaultSampler()
6468

6569
self._emitter = UDPEmitter()
66-
self._sampler = DefaultSampler()
6770
self._sampling = True
6871
self._max_trace_back = 10
6972
self._plugins = None
@@ -72,12 +75,15 @@ def __init__(self):
7275
self._aws_metadata = copy.deepcopy(XRAY_META)
7376
self._origin = None
7477

78+
if type(self.sampler).__name__ == 'DefaultSampler':
79+
self.sampler.load_settings(DaemonConfig(), self.context)
80+
7581
def configure(self, sampling=None, plugins=None,
7682
context_missing=None, sampling_rules=None,
7783
daemon_address=None, service=None,
7884
context=None, emitter=None, streaming=None,
7985
dynamic_naming=None, streaming_threshold=None,
80-
max_trace_back=None):
86+
max_trace_back=None, sampler=None):
8187
"""Configure global X-Ray recorder.
8288
8389
Configure needs to run before patching thrid party libraries
@@ -86,10 +92,16 @@ def configure(self, sampling=None, plugins=None,
8692
:param bool sampling: If sampling is enabled, every time the recorder
8793
creates a segment it decides whether to send this segment to
8894
the X-Ray daemon. This setting is not used if the recorder
89-
is running in AWS Lambda.
90-
:param sampling_rules: Pass a set of custom sampling rules.
95+
is running in AWS Lambda. The recorder always respect the incoming
96+
sampling decisions regardless of this setting.
97+
:param sampling_rules: Pass a set of local custom sampling rules.
9198
Can be an absolute path of the sampling rule config json file
92-
or a dictionary that defines those rules.
99+
or a dictionary that defines those rules. This will also be the
100+
fallback rules in case of centralized sampling opted-in while
101+
the cetralized sampling rules are not available.
102+
:param sampler: The sampler used to make sampling decisions. The SDK
103+
provides two built-in samplers. One is centralized rules based and
104+
the other is local rules based. The former is the default.
93105
:param tuple plugins: plugins that add extra metadata to each segment.
94106
Currently available plugins are EC2Plugin, ECS plugin and
95107
ElasticBeanstalkPlugin.
@@ -127,6 +139,8 @@ class to have your own implementation of the streaming process.
127139
"""
128140
if sampling is not None:
129141
self.sampling = sampling
142+
if sampler:
143+
self.sampler = sampler
130144
if service:
131145
self.service = os.getenv(TRACING_NAME_KEY, service)
132146
if sampling_rules:
@@ -160,6 +174,10 @@ class to have your own implementation of the streaming process.
160174
self._aws_metadata = copy.deepcopy(XRAY_META)
161175
self._origin = None
162176

177+
if type(self.sampler).__name__ == 'DefaultSampler':
178+
self.sampler.load_settings(DaemonConfig(daemon_address),
179+
self.context, self._origin)
180+
163181
def begin_segment(self, name=None, traceid=None,
164182
parent_id=None, sampling=None):
165183
"""
@@ -175,21 +193,26 @@ def begin_segment(self, name=None, traceid=None,
175193
if not seg_name:
176194
raise SegmentNameMissingException("Segment name is required.")
177195

178-
# we respect sampling decision regardless of recorder configuration.
179-
dummy = False
196+
# Sampling decision is None if not sampled.
197+
# In a sampled case it could be either a string or 1
198+
# depending on if centralized or local sampling rule takes effect.
199+
decision = True
200+
201+
# we respect the input sampling decision
202+
# regardless of recorder configuration.
180203
if sampling == 0:
181-
dummy = True
182-
elif sampling == 1:
183-
dummy = False
184-
elif self.sampling and not self._sampler.should_trace():
185-
dummy = True
204+
decision = False
205+
elif sampling:
206+
decision = sampling
207+
elif self.sampling:
208+
decision = self._sampler.should_trace()
186209

187-
if dummy:
210+
if not decision:
188211
segment = DummySegment(seg_name)
189212
else:
190213
segment = Segment(name=seg_name, traceid=traceid,
191214
parent_id=parent_id)
192-
self._populate_runtime_context(segment)
215+
self._populate_runtime_context(segment, decision)
193216

194217
self.context.put_segment(segment)
195218
return segment
@@ -397,13 +420,16 @@ def record_subsegment(self, wrapped, instance, args, kwargs, name,
397420

398421
self.end_subsegment(end_time)
399422

400-
def _populate_runtime_context(self, segment):
423+
def _populate_runtime_context(self, segment, sampling_decision):
401424
if self._origin:
402425
setattr(segment, 'origin', self._origin)
403426

404-
segment.set_aws(self._aws_metadata)
427+
segment.set_aws(copy.deepcopy(self._aws_metadata))
405428
segment.set_service(SERVICE_INFO)
406429

430+
if isinstance(sampling_decision, string_types):
431+
segment.set_rule_name(sampling_decision)
432+
407433
def _send_segment(self):
408434
"""
409435
Send the current segment to X-Ray daemon if it is present and
@@ -429,10 +455,10 @@ def _load_sampling_rules(self, sampling_rules):
429455
return
430456

431457
if isinstance(sampling_rules, dict):
432-
self.sampler = DefaultSampler(sampling_rules)
458+
self.sampler.load_local_rules(sampling_rules)
433459
else:
434460
with open(sampling_rules) as f:
435-
self.sampler = DefaultSampler(json.load(f))
461+
self.sampler.load_local_rules(json.load(f))
436462

437463
def _is_subsegment(self, entity):
438464

0 commit comments

Comments
 (0)