forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbase.py
69 lines (56 loc) · 2.5 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
from typing import Any, Dict, Optional, Union
import fastjsonschema
import jmespath
from jmespath.exceptions import LexerError
from aws_lambda_powertools.shared.jmespath_functions import PowertoolsFunctions
from .exceptions import InvalidEnvelopeExpressionError, InvalidSchemaFormatError, SchemaValidationError
logger = logging.getLogger(__name__)
def validate_data_against_schema(data: Union[Dict, str], schema: Dict, formats: Optional[Dict] = None):
"""Validate dict data against given JSON Schema
Parameters
----------
data : Dict
Data set to be validated
schema : Dict
JSON Schema to validate against
formats: Dict
Custom formats containing a key (e.g. int64) and a value expressed as regex or callback returning bool
Raises
------
SchemaValidationError
When schema validation fails against data set
InvalidSchemaFormatError
When JSON schema provided is invalid
"""
try:
formats = formats or {}
fastjsonschema.validate(definition=schema, data=data, formats=formats)
except (TypeError, AttributeError, fastjsonschema.JsonSchemaDefinitionException) as e:
raise InvalidSchemaFormatError(f"Schema received: {schema}, Formats: {formats}. Error: {e}")
except fastjsonschema.JsonSchemaException as e:
message = f"Failed schema validation. Error: {e.message}, Path: {e.path}, Data: {e.value}" # noqa: B306, E501
raise SchemaValidationError(message)
def unwrap_event_from_envelope(data: Union[Dict, str], envelope: str, jmespath_options: Optional[Dict]) -> Any:
"""Searches data using JMESPath expression
Parameters
----------
data : Dict
Data set to be filtered
envelope : str
JMESPath expression to filter data against
jmespath_options : Dict
Alternative JMESPath options to be included when filtering expr
Returns
-------
Any
Data found using JMESPath expression given in envelope
"""
if not jmespath_options:
jmespath_options = {"custom_functions": PowertoolsFunctions()}
try:
logger.debug(f"Envelope detected: {envelope}. JMESPath options: {jmespath_options}")
return jmespath.search(envelope, data, options=jmespath.Options(**jmespath_options))
except (LexerError, TypeError, UnicodeError) as e:
message = f"Failed to unwrap event from envelope using expression. Error: {e} Exp: {envelope}, Data: {data}" # noqa: B306, E501
raise InvalidEnvelopeExpressionError(message)