forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjmespath_utils.py
55 lines (43 loc) · 1.89 KB
/
jmespath_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import base64
import gzip
import json
from typing import Any, Dict, Optional, Union
import jmespath
from jmespath.exceptions import LexerError
from aws_lambda_powertools.utilities.validation import InvalidEnvelopeExpressionError
from aws_lambda_powertools.utilities.validation.base import logger
class PowertoolsFunctions(jmespath.functions.Functions):
@jmespath.functions.signature({"types": ["string"]})
def _func_powertools_json(self, value):
return json.loads(value)
@jmespath.functions.signature({"types": ["string"]})
def _func_powertools_base64(self, value):
return base64.b64decode(value).decode()
@jmespath.functions.signature({"types": ["string"]})
def _func_powertools_base64_gzip(self, value):
encoded = base64.b64decode(value)
uncompressed = gzip.decompress(encoded)
return uncompressed.decode()
def unwrap_event_from_envelope(data: Union[Dict, str], envelope: str, jmespath_options: Optional[Dict]) -> Any:
"""Searches data using JMESPath expression
Parameters
----------
data : Dict
Data set to be filtered
envelope : str
JMESPath expression to filter data against
jmespath_options : Dict
Alternative JMESPath options to be included when filtering expr
Returns
-------
Any
Data found using JMESPath expression given in envelope
"""
if not jmespath_options:
jmespath_options = {"custom_functions": PowertoolsFunctions()}
try:
logger.debug(f"Envelope detected: {envelope}. JMESPath options: {jmespath_options}")
return jmespath.search(envelope, data, options=jmespath.Options(**jmespath_options))
except (LexerError, TypeError, UnicodeError) as e:
message = f"Failed to unwrap event from envelope using expression. Error: {e} Exp: {envelope}, Data: {data}" # noqa: B306, E501
raise InvalidEnvelopeExpressionError(message)