Skip to content

Commit 3ec21fb

Browse files
committed
Added Async functions for begin subsegment and capture.
Also fixed typo from my last PR yield from is not valid syntax in py27 so was causing issues in the recorder module, instead recorder is subclassed to provide some py3 compatible async functions and the recorder class] will be chosen depending on if py2 or py3. Set the correct context class in all test cases as they would inherit the context of the last test ran, in some cases botocore was trying to use a async context. Makes use of async syntax consistent. Reduced duplication of boto patching functions Made the library zipsafe when loading resources Improved test for testing nested subsegments Renamed botocore_para_whitelist.json Updated README
1 parent 9672b13 commit 3ec21fb

28 files changed

+515
-149
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
include aws_xray_sdk/ext/botocore/*.json
1+
include aws_xray_sdk/ext/resources/*.json
22
include aws_xray_sdk/core/sampling/*.json
33
include README.md
44
include LICENSE

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,17 @@ def myfunc():
9393
myfunc()
9494
```
9595

96+
```python
97+
from aws_xray_sdk.core import xray_recorder
98+
99+
@xray_recorder.capture_async('subsegment_name')
100+
async def myfunc():
101+
# Do something here
102+
103+
async def main():
104+
await myfunc()
105+
```
106+
96107
**Trace AWS Lambda functions**
97108

98109
```python
@@ -149,6 +160,22 @@ xray_recorder.configure(service='fallback_name', dynamic_naming='*mysite.com*')
149160
XRayMiddleware(app, xray_recorder)
150161
```
151162

163+
**Add aiohttp middleware**
164+
```python
165+
from aiohttp import web
166+
167+
from aws_xray_sdk.ext.aiohttp.middleware import middleware
168+
from aws_xray_sdk.core import xray_recorder
169+
from aws_xray_sdk.core.async_context import AsyncContext
170+
171+
xray_recorder.configure(service='fallback_name', context=AsyncContext())
172+
173+
app = web.Application(middlewares=[middleware])
174+
app.router.add_get("/", handler)
175+
176+
web.run_app(app)
177+
```
178+
152179
## License
153180

154181
The AWS X-Ray SDK for Python is licensed under the Apache 2.0 License. See LICENSE and NOTICE.txt for more information.

aws_xray_sdk/core/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from .recorder import AWSXRayRecorder
22
from .patcher import patch_all, patch
3+
from .utils.compat import PY35
34

45

5-
xray_recorder = AWSXRayRecorder()
6+
if not PY35:
7+
xray_recorder = AWSXRayRecorder()
8+
else:
9+
from .async_recorder import AsyncAWSXRayRecorder
10+
xray_recorder = AsyncAWSXRayRecorder()
611

712
__all__ = [
813
'patch',

aws_xray_sdk/core/async_recorder.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import time
2+
import traceback
3+
4+
import wrapt
5+
6+
from aws_xray_sdk.core.recorder import AWSXRayRecorder
7+
8+
9+
class AsyncAWSXRayRecorder(AWSXRayRecorder):
10+
def capture_async(self, name=None):
11+
"""
12+
A decorator that records enclosed function in a subsegment.
13+
It only works with asynchronous functions.
14+
15+
params str name: The name of the subsegment. If not specified
16+
the function name will be used.
17+
"""
18+
19+
@wrapt.decorator
20+
async def wrapper(wrapped, instance, args, kwargs):
21+
func_name = name
22+
if not func_name:
23+
func_name = wrapped.__name__
24+
25+
result = await self.record_subsegment_async(
26+
wrapped, instance, args, kwargs,
27+
name=func_name,
28+
namespace='local',
29+
meta_processor=None,
30+
)
31+
32+
return result
33+
34+
return wrapper
35+
36+
async def record_subsegment_async(self, wrapped, instance, args, kwargs, name,
37+
namespace, meta_processor):
38+
39+
subsegment = self.begin_subsegment(name, namespace)
40+
41+
exception = None
42+
stack = None
43+
return_value = None
44+
45+
try:
46+
return_value = await wrapped(*args, **kwargs)
47+
return return_value
48+
except Exception as e:
49+
exception = e
50+
stack = traceback.extract_stack(limit=self._max_trace_back)
51+
raise
52+
finally:
53+
end_time = time.time()
54+
if callable(meta_processor):
55+
meta_processor(
56+
wrapped=wrapped,
57+
instance=instance,
58+
args=args,
59+
kwargs=kwargs,
60+
return_value=return_value,
61+
exception=exception,
62+
subsegment=subsegment,
63+
stack=stack,
64+
)
65+
elif exception:
66+
if subsegment:
67+
subsegment.add_exception(exception, stack)
68+
69+
self.end_subsegment(end_time)

aws_xray_sdk/core/patcher.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
log = logging.getLogger(__name__)
55

66
SUPPORTED_MODULES = (
7+
'aiobotocore',
78
'botocore',
89
'requests',
910
'sqlite3',
@@ -23,10 +24,14 @@ def patch(modules_to_patch, raise_errors=True):
2324

2425

2526
def _patch_module(module_to_patch, raise_errors=True):
26-
# boto3 depends on botocore and patch botocore is sufficient
27+
# boto3 depends on botocore and patching botocore is sufficient
2728
if module_to_patch == 'boto3':
2829
module_to_patch = 'botocore'
2930

31+
# aioboto3 depends on aiobotocore and patching aiobotocore is sufficient
32+
if module_to_patch == 'aioboto3':
33+
module_to_patch = 'aiobotocore'
34+
3035
if module_to_patch not in SUPPORTED_MODULES:
3136
raise Exception('module %s is currently not supported for patching'
3237
% module_to_patch)

aws_xray_sdk/core/recorder.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ class AWSXRayRecorder(object):
3030
A global AWS X-Ray recorder that will begin/end segments/subsegments
3131
and send them to the X-Ray daemon. This recorder is initialized during
3232
loading time so you can use::
33+
3334
from aws_xray_sdk.core import xray_recorder
35+
3436
in your module to access it
3537
"""
3638
def __init__(self):
@@ -312,15 +314,16 @@ def record_subsegment(self, wrapped, instance, args, kwargs, name,
312314

313315
subsegment = self.begin_subsegment(name, namespace)
314316

317+
exception = None
318+
stack = None
319+
return_value = None
320+
315321
try:
316322
return_value = wrapped(*args, **kwargs)
317-
exception = None
318-
stack = None
319323
return return_value
320324
except Exception as e:
321325
exception = e
322326
stack = traceback.extract_stack(limit=self._max_trace_back)
323-
return_value = None
324327
raise
325328
finally:
326329
end_time = time.time()

aws_xray_sdk/core/sampling/default_sampler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
import json
33
from random import Random
44

5+
from pkg_resources import resource_filename
56
from .sampling_rule import SamplingRule
67
from ..exceptions.exceptions import InvalidSamplingManifestError
78

8-
__location__ = os.path.realpath(
9-
os.path.join(os.getcwd(), os.path.dirname(__file__)))
109

11-
with open(os.path.join(__location__, 'default_sampling_rule.json')) as f:
10+
with open(resource_filename(__name__, 'default_sampling_rule.json')) as f:
1211
default_sampling_rule = json.load(f)
1312

1413

aws_xray_sdk/core/utils/compat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
PY2 = sys.version_info < (3,)
5+
PY35 = sys.version_info >= (3, 5)
56

67
if PY2:
78
annotation_value_types = (int, long, float, bool, str) # noqa: F821
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .patch import patch
2+
3+
__all__ = ['patch']

aws_xray_sdk/ext/aiobotocore/patch.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import aiobotocore.client
2+
import wrapt
3+
4+
from aws_xray_sdk.core import xray_recorder
5+
from aws_xray_sdk.ext.boto_utils import inject_header, aws_meta_processor
6+
7+
8+
def patch():
9+
"""
10+
Patch aiobotocore client so it generates subsegments
11+
when calling AWS services.
12+
"""
13+
if hasattr(aiobotocore.client, '_xray_enabled'):
14+
return
15+
setattr(aiobotocore.client, '_xray_enabled', True)
16+
17+
wrapt.wrap_function_wrapper(
18+
'aiobotocore.client',
19+
'AioBaseClient._make_api_call',
20+
_xray_traced_aiobotocore,
21+
)
22+
23+
wrapt.wrap_function_wrapper(
24+
'aiobotocore.endpoint',
25+
'AioEndpoint._encode_headers',
26+
inject_header,
27+
)
28+
29+
30+
async def _xray_traced_aiobotocore(wrapped, instance, args, kwargs):
31+
service = instance._service_model.metadata["endpointPrefix"]
32+
result = await xray_recorder.record_subsegment_async(
33+
wrapped, instance, args, kwargs,
34+
name=service,
35+
namespace='aws',
36+
meta_processor=aws_meta_processor,
37+
)
38+
39+
return result

aws_xray_sdk/ext/boto_utils.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from __future__ import absolute_import
2+
# Need absolute import as botocore is also in the current folder for py27
3+
import json
4+
5+
from pkg_resources import resource_filename
6+
from botocore.exceptions import ClientError
7+
8+
from aws_xray_sdk.core import xray_recorder
9+
from aws_xray_sdk.core.models import http
10+
11+
from aws_xray_sdk.ext.util import inject_trace_header, to_snake_case
12+
13+
14+
with open(resource_filename(__name__, 'resources/aws_para_whitelist.json'), 'r') as data_file:
15+
whitelist = json.load(data_file)
16+
17+
18+
def inject_header(wrapped, instance, args, kwargs):
19+
headers = args[0]
20+
inject_trace_header(headers, xray_recorder.current_subsegment())
21+
return wrapped(*args, **kwargs)
22+
23+
24+
def aws_meta_processor(wrapped, instance, args, kwargs,
25+
return_value, exception, subsegment, stack):
26+
region = instance.meta.region_name
27+
28+
if 'operation_name' in kwargs:
29+
operation_name = kwargs['operation_name']
30+
else:
31+
operation_name = args[0]
32+
33+
aws_meta = {
34+
'operation': operation_name,
35+
'region': region,
36+
}
37+
38+
if return_value:
39+
resp_meta = return_value.get('ResponseMetadata')
40+
if resp_meta:
41+
aws_meta['request_id'] = resp_meta.get('RequestId')
42+
subsegment.put_http_meta(http.STATUS,
43+
resp_meta.get('HTTPStatusCode'))
44+
# for service like S3 that returns special request id in response headers
45+
if 'HTTPHeaders' in resp_meta and resp_meta['HTTPHeaders'].get('x-amz-id-2'):
46+
aws_meta['id_2'] = resp_meta['HTTPHeaders']['x-amz-id-2']
47+
48+
elif exception:
49+
_aws_error_handler(exception, stack, subsegment, aws_meta)
50+
51+
_extract_whitelisted_params(subsegment.name, operation_name,
52+
aws_meta, args, kwargs, return_value)
53+
54+
subsegment.set_aws(aws_meta)
55+
56+
57+
def _aws_error_handler(exception, stack, subsegment, aws_meta):
58+
59+
if not exception or not isinstance(exception, ClientError):
60+
return
61+
62+
response_metadata = exception.response.get('ResponseMetadata')
63+
64+
if not response_metadata:
65+
return
66+
67+
aws_meta['request_id'] = response_metadata.get('RequestId')
68+
69+
status_code = response_metadata.get('HTTPStatusCode')
70+
71+
subsegment.put_http_meta(http.STATUS, status_code)
72+
if status_code == 429:
73+
subsegment.add_throttle_flag()
74+
if status_code / 100 == 4:
75+
subsegment.add_error_flag()
76+
77+
subsegment.add_exception(exception, stack, True)
78+
79+
80+
def _extract_whitelisted_params(service, operation,
81+
aws_meta, args, kwargs, response):
82+
83+
# check if service is whitelisted
84+
if service not in whitelist['services']:
85+
return
86+
operations = whitelist['services'][service]['operations']
87+
88+
# check if operation is whitelisted
89+
if operation not in operations:
90+
return
91+
params = operations[operation]
92+
93+
# record whitelisted request/response parameters
94+
if 'request_parameters' in params:
95+
_record_params(params['request_parameters'], args[1], aws_meta)
96+
97+
if 'request_descriptors' in params:
98+
_record_special_params(params['request_descriptors'],
99+
args[1], aws_meta)
100+
101+
if 'response_parameters' in params and response:
102+
_record_params(params['response_parameters'], response, aws_meta)
103+
104+
if 'response_descriptors' in params and response:
105+
_record_special_params(params['response_descriptors'],
106+
response, aws_meta)
107+
108+
109+
def _record_params(whitelisted, actual, aws_meta):
110+
111+
for key in whitelisted:
112+
if key in actual:
113+
snake_key = to_snake_case(key)
114+
aws_meta[snake_key] = actual[key]
115+
116+
117+
def _record_special_params(whitelisted, actual, aws_meta):
118+
119+
for key in whitelisted:
120+
if key in actual:
121+
_process_descriptor(whitelisted[key], actual[key], aws_meta)
122+
123+
124+
def _process_descriptor(descriptor, value, aws_meta):
125+
126+
# "get_count" = true
127+
if 'get_count' in descriptor and descriptor['get_count']:
128+
value = len(value)
129+
130+
# "get_keys" = true
131+
if 'get_keys' in descriptor and descriptor['get_keys']:
132+
value = value.keys()
133+
134+
aws_meta[descriptor['rename_to']] = value

0 commit comments

Comments
 (0)