Skip to content

Commit e8a1d4f

Browse files
authored
Merge pull request #6 from terrycain/aiobotocore
Add support for aiobotocore
2 parents 8295724 + 3ec21fb commit e8a1d4f

28 files changed

+515
-149
lines changed

MANIFEST.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
include aws_xray_sdk/ext/botocore/*.json
1+
include aws_xray_sdk/ext/resources/*.json
22
include aws_xray_sdk/core/sampling/*.json
33
include README.md
44
include LICENSE

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,17 @@ def myfunc():
9393
myfunc()
9494
```
9595

96+
```python
97+
from aws_xray_sdk.core import xray_recorder
98+
99+
@xray_recorder.capture_async('subsegment_name')
100+
async def myfunc():
101+
# Do something here
102+
103+
async def main():
104+
await myfunc()
105+
```
106+
96107
**Trace AWS Lambda functions**
97108

98109
```python
@@ -149,6 +160,22 @@ xray_recorder.configure(service='fallback_name', dynamic_naming='*mysite.com*')
149160
XRayMiddleware(app, xray_recorder)
150161
```
151162

163+
**Add aiohttp middleware**
164+
```python
165+
from aiohttp import web
166+
167+
from aws_xray_sdk.ext.aiohttp.middleware import middleware
168+
from aws_xray_sdk.core import xray_recorder
169+
from aws_xray_sdk.core.async_context import AsyncContext
170+
171+
xray_recorder.configure(service='fallback_name', context=AsyncContext())
172+
173+
app = web.Application(middlewares=[middleware])
174+
app.router.add_get("/", handler)
175+
176+
web.run_app(app)
177+
```
178+
152179
## License
153180

154181
The AWS X-Ray SDK for Python is licensed under the Apache 2.0 License. See LICENSE and NOTICE.txt for more information.

aws_xray_sdk/core/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from .recorder import AWSXRayRecorder
22
from .patcher import patch_all, patch
3+
from .utils.compat import PY35
34

45

5-
xray_recorder = AWSXRayRecorder()
6+
if not PY35:
7+
xray_recorder = AWSXRayRecorder()
8+
else:
9+
from .async_recorder import AsyncAWSXRayRecorder
10+
xray_recorder = AsyncAWSXRayRecorder()
611

712
__all__ = [
813
'patch',

aws_xray_sdk/core/async_recorder.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import time
2+
import traceback
3+
4+
import wrapt
5+
6+
from aws_xray_sdk.core.recorder import AWSXRayRecorder
7+
8+
9+
class AsyncAWSXRayRecorder(AWSXRayRecorder):
10+
def capture_async(self, name=None):
11+
"""
12+
A decorator that records enclosed function in a subsegment.
13+
It only works with asynchronous functions.
14+
15+
params str name: The name of the subsegment. If not specified
16+
the function name will be used.
17+
"""
18+
19+
@wrapt.decorator
20+
async def wrapper(wrapped, instance, args, kwargs):
21+
func_name = name
22+
if not func_name:
23+
func_name = wrapped.__name__
24+
25+
result = await self.record_subsegment_async(
26+
wrapped, instance, args, kwargs,
27+
name=func_name,
28+
namespace='local',
29+
meta_processor=None,
30+
)
31+
32+
return result
33+
34+
return wrapper
35+
36+
async def record_subsegment_async(self, wrapped, instance, args, kwargs, name,
37+
namespace, meta_processor):
38+
39+
subsegment = self.begin_subsegment(name, namespace)
40+
41+
exception = None
42+
stack = None
43+
return_value = None
44+
45+
try:
46+
return_value = await wrapped(*args, **kwargs)
47+
return return_value
48+
except Exception as e:
49+
exception = e
50+
stack = traceback.extract_stack(limit=self._max_trace_back)
51+
raise
52+
finally:
53+
end_time = time.time()
54+
if callable(meta_processor):
55+
meta_processor(
56+
wrapped=wrapped,
57+
instance=instance,
58+
args=args,
59+
kwargs=kwargs,
60+
return_value=return_value,
61+
exception=exception,
62+
subsegment=subsegment,
63+
stack=stack,
64+
)
65+
elif exception:
66+
if subsegment:
67+
subsegment.add_exception(exception, stack)
68+
69+
self.end_subsegment(end_time)

aws_xray_sdk/core/patcher.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
log = logging.getLogger(__name__)
55

66
SUPPORTED_MODULES = (
7+
'aiobotocore',
78
'botocore',
89
'requests',
910
'sqlite3',
@@ -23,10 +24,14 @@ def patch(modules_to_patch, raise_errors=True):
2324

2425

2526
def _patch_module(module_to_patch, raise_errors=True):
26-
# boto3 depends on botocore and patch botocore is sufficient
27+
# boto3 depends on botocore and patching botocore is sufficient
2728
if module_to_patch == 'boto3':
2829
module_to_patch = 'botocore'
2930

31+
# aioboto3 depends on aiobotocore and patching aiobotocore is sufficient
32+
if module_to_patch == 'aioboto3':
33+
module_to_patch = 'aiobotocore'
34+
3035
if module_to_patch not in SUPPORTED_MODULES:
3136
raise Exception('module %s is currently not supported for patching'
3237
% module_to_patch)

aws_xray_sdk/core/recorder.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ class AWSXRayRecorder(object):
3030
A global AWS X-Ray recorder that will begin/end segments/subsegments
3131
and send them to the X-Ray daemon. This recorder is initialized during
3232
loading time so you can use::
33+
3334
from aws_xray_sdk.core import xray_recorder
35+
3436
in your module to access it
3537
"""
3638
def __init__(self):
@@ -312,15 +314,16 @@ def record_subsegment(self, wrapped, instance, args, kwargs, name,
312314

313315
subsegment = self.begin_subsegment(name, namespace)
314316

317+
exception = None
318+
stack = None
319+
return_value = None
320+
315321
try:
316322
return_value = wrapped(*args, **kwargs)
317-
exception = None
318-
stack = None
319323
return return_value
320324
except Exception as e:
321325
exception = e
322326
stack = traceback.extract_stack(limit=self._max_trace_back)
323-
return_value = None
324327
raise
325328
finally:
326329
end_time = time.time()

aws_xray_sdk/core/sampling/default_sampler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
import json
33
from random import Random
44

5+
from pkg_resources import resource_filename
56
from .sampling_rule import SamplingRule
67
from ..exceptions.exceptions import InvalidSamplingManifestError
78

8-
__location__ = os.path.realpath(
9-
os.path.join(os.getcwd(), os.path.dirname(__file__)))
109

11-
with open(os.path.join(__location__, 'default_sampling_rule.json')) as f:
10+
with open(resource_filename(__name__, 'default_sampling_rule.json')) as f:
1211
default_sampling_rule = json.load(f)
1312

1413

aws_xray_sdk/core/utils/compat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
PY2 = sys.version_info < (3,)
5+
PY35 = sys.version_info >= (3, 5)
56

67
if PY2:
78
annotation_value_types = (int, long, float, bool, str) # noqa: F821
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .patch import patch
2+
3+
__all__ = ['patch']

aws_xray_sdk/ext/aiobotocore/patch.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import aiobotocore.client
2+
import wrapt
3+
4+
from aws_xray_sdk.core import xray_recorder
5+
from aws_xray_sdk.ext.boto_utils import inject_header, aws_meta_processor
6+
7+
8+
def patch():
9+
"""
10+
Patch aiobotocore client so it generates subsegments
11+
when calling AWS services.
12+
"""
13+
if hasattr(aiobotocore.client, '_xray_enabled'):
14+
return
15+
setattr(aiobotocore.client, '_xray_enabled', True)
16+
17+
wrapt.wrap_function_wrapper(
18+
'aiobotocore.client',
19+
'AioBaseClient._make_api_call',
20+
_xray_traced_aiobotocore,
21+
)
22+
23+
wrapt.wrap_function_wrapper(
24+
'aiobotocore.endpoint',
25+
'AioEndpoint._encode_headers',
26+
inject_header,
27+
)
28+
29+
30+
async def _xray_traced_aiobotocore(wrapped, instance, args, kwargs):
31+
service = instance._service_model.metadata["endpointPrefix"]
32+
result = await xray_recorder.record_subsegment_async(
33+
wrapped, instance, args, kwargs,
34+
name=service,
35+
namespace='aws',
36+
meta_processor=aws_meta_processor,
37+
)
38+
39+
return result

aws_xray_sdk/ext/boto_utils.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
from __future__ import absolute_import
2+
# Need absolute import as botocore is also in the current folder for py27
3+
import json
4+
5+
from pkg_resources import resource_filename
6+
from botocore.exceptions import ClientError
7+
8+
from aws_xray_sdk.core import xray_recorder
9+
from aws_xray_sdk.core.models import http
10+
11+
from aws_xray_sdk.ext.util import inject_trace_header, to_snake_case
12+
13+
14+
with open(resource_filename(__name__, 'resources/aws_para_whitelist.json'), 'r') as data_file:
15+
whitelist = json.load(data_file)
16+
17+
18+
def inject_header(wrapped, instance, args, kwargs):
19+
headers = args[0]
20+
inject_trace_header(headers, xray_recorder.current_subsegment())
21+
return wrapped(*args, **kwargs)
22+
23+
24+
def aws_meta_processor(wrapped, instance, args, kwargs,
25+
return_value, exception, subsegment, stack):
26+
region = instance.meta.region_name
27+
28+
if 'operation_name' in kwargs:
29+
operation_name = kwargs['operation_name']
30+
else:
31+
operation_name = args[0]
32+
33+
aws_meta = {
34+
'operation': operation_name,
35+
'region': region,
36+
}
37+
38+
if return_value:
39+
resp_meta = return_value.get('ResponseMetadata')
40+
if resp_meta:
41+
aws_meta['request_id'] = resp_meta.get('RequestId')
42+
subsegment.put_http_meta(http.STATUS,
43+
resp_meta.get('HTTPStatusCode'))
44+
# for service like S3 that returns special request id in response headers
45+
if 'HTTPHeaders' in resp_meta and resp_meta['HTTPHeaders'].get('x-amz-id-2'):
46+
aws_meta['id_2'] = resp_meta['HTTPHeaders']['x-amz-id-2']
47+
48+
elif exception:
49+
_aws_error_handler(exception, stack, subsegment, aws_meta)
50+
51+
_extract_whitelisted_params(subsegment.name, operation_name,
52+
aws_meta, args, kwargs, return_value)
53+
54+
subsegment.set_aws(aws_meta)
55+
56+
57+
def _aws_error_handler(exception, stack, subsegment, aws_meta):
58+
59+
if not exception or not isinstance(exception, ClientError):
60+
return
61+
62+
response_metadata = exception.response.get('ResponseMetadata')
63+
64+
if not response_metadata:
65+
return
66+
67+
aws_meta['request_id'] = response_metadata.get('RequestId')
68+
69+
status_code = response_metadata.get('HTTPStatusCode')
70+
71+
subsegment.put_http_meta(http.STATUS, status_code)
72+
if status_code == 429:
73+
subsegment.add_throttle_flag()
74+
if status_code / 100 == 4:
75+
subsegment.add_error_flag()
76+
77+
subsegment.add_exception(exception, stack, True)
78+
79+
80+
def _extract_whitelisted_params(service, operation,
81+
aws_meta, args, kwargs, response):
82+
83+
# check if service is whitelisted
84+
if service not in whitelist['services']:
85+
return
86+
operations = whitelist['services'][service]['operations']
87+
88+
# check if operation is whitelisted
89+
if operation not in operations:
90+
return
91+
params = operations[operation]
92+
93+
# record whitelisted request/response parameters
94+
if 'request_parameters' in params:
95+
_record_params(params['request_parameters'], args[1], aws_meta)
96+
97+
if 'request_descriptors' in params:
98+
_record_special_params(params['request_descriptors'],
99+
args[1], aws_meta)
100+
101+
if 'response_parameters' in params and response:
102+
_record_params(params['response_parameters'], response, aws_meta)
103+
104+
if 'response_descriptors' in params and response:
105+
_record_special_params(params['response_descriptors'],
106+
response, aws_meta)
107+
108+
109+
def _record_params(whitelisted, actual, aws_meta):
110+
111+
for key in whitelisted:
112+
if key in actual:
113+
snake_key = to_snake_case(key)
114+
aws_meta[snake_key] = actual[key]
115+
116+
117+
def _record_special_params(whitelisted, actual, aws_meta):
118+
119+
for key in whitelisted:
120+
if key in actual:
121+
_process_descriptor(whitelisted[key], actual[key], aws_meta)
122+
123+
124+
def _process_descriptor(descriptor, value, aws_meta):
125+
126+
# "get_count" = true
127+
if 'get_count' in descriptor and descriptor['get_count']:
128+
value = len(value)
129+
130+
# "get_keys" = true
131+
if 'get_keys' in descriptor and descriptor['get_keys']:
132+
value = value.keys()
133+
134+
aws_meta[descriptor['rename_to']] = value

0 commit comments

Comments
 (0)