-
Notifications
You must be signed in to change notification settings - Fork 420
/
Copy pathbase.py
287 lines (244 loc) · 10.5 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from __future__ import annotations
import functools
import logging
import warnings
from numbers import Number
from typing import Any, Callable, Mapping, Optional, Sequence, Union, overload
from jsonpath_ng.ext import parse
from aws_lambda_powertools.utilities.data_masking.exceptions import (
DataMaskingFieldNotFoundError,
DataMaskingUnsupportedTypeError,
)
from aws_lambda_powertools.utilities.data_masking.provider import BaseProvider
logger = logging.getLogger(__name__)
class DataMasking:
"""
Note: This utility is currently in a Non-General Availability (Non-GA) phase and may have limitations.
Please DON'T USE THIS utility in production environments.
Keep in mind that when we transition to General Availability (GA), there might be breaking changes introduced.
The DataMasking class orchestrates erasing, encrypting, and decrypting
for the base provider.
Example:
```
from aws_lambda_powertools.utilities.data_masking.base import DataMasking
def lambda_handler(event, context):
masker = DataMasking()
data = {
"project": "powertools",
"sensitive": "password"
}
erased = masker.erase(data,fields=["sensitive"])
return erased
```
"""
def __init__(
self,
provider: Optional[BaseProvider] = None,
raise_on_missing_field: bool = True,
):
self.provider = provider or BaseProvider()
# NOTE: we depend on Provider to not confuse customers in passing the same 2 serializers in 2 places
self.json_serializer = self.provider.json_serializer
self.json_deserializer = self.provider.json_deserializer
self.raise_on_missing_field = raise_on_missing_field
def encrypt(
self,
data: dict | Mapping | Sequence | Number,
provider_options: dict | None = None,
**encryption_context: str,
) -> str:
return self._apply_action(
data=data,
fields=None,
action=self.provider.encrypt,
provider_options=provider_options or {},
**encryption_context,
)
def decrypt(
self,
data,
provider_options: dict | None = None,
**encryption_context: str,
) -> Any:
return self._apply_action(
data=data,
fields=None,
action=self.provider.decrypt,
provider_options=provider_options or {},
**encryption_context,
)
@overload
def erase(self, data, fields: None) -> str: ...
@overload
def erase(self, data: list, fields: list[str]) -> list[str]: ...
@overload
def erase(self, data: tuple, fields: list[str]) -> tuple[str]: ...
@overload
def erase(self, data: dict, fields: list[str]) -> dict: ...
def erase(self, data: Sequence | Mapping, fields: list[str] | None = None) -> str | list[str] | tuple[str] | dict:
return self._apply_action(data=data, fields=fields, action=self.provider.erase)
def _apply_action(
self,
data,
fields: list[str] | None,
action: Callable,
provider_options: dict | None = None,
**encryption_context: str,
):
"""
Helper method to determine whether to apply a given action to the entire input data
or to specific fields if the 'fields' argument is specified.
Parameters
----------
data : str | dict
The input data to process.
fields : Optional[List[str]]
A list of fields to apply the action to. If 'None', the action is applied to the entire 'data'.
action : Callable
The action to apply to the data. It should be a callable that performs an operation on the data
and returns the modified value.
provider_options : dict
Provider specific keyword arguments to propagate; used as an escape hatch.
encryption_context: str
Encryption context to use in encrypt and decrypt operations.
Returns
-------
any
The modified data after applying the action.
"""
if fields is not None:
logger.debug(f"Running action {action.__name__} with fields {fields}")
return self._apply_action_to_fields(
data=data,
fields=fields,
action=action,
provider_options=provider_options,
**encryption_context,
)
else:
logger.debug(f"Running action {action.__name__} with the entire data")
return action(data=data, provider_options=provider_options, **encryption_context)
def _apply_action_to_fields(
self,
data: Union[dict, str],
fields: list,
action: Callable,
provider_options: dict | None = None,
**encryption_context: str,
) -> Union[dict, str]:
"""
This method takes the input data, which can be either a dictionary or a JSON string,
and erases, encrypts, or decrypts the specified fields.
Parameters
----------
data : Union[dict, str])
The input data to process. It can be either a dictionary or a JSON string.
fields : List
A list of fields to apply the action to. Each field can be specified as a string or
a list of strings representing nested keys in the dictionary.
action : Callable
The action to apply to the fields. It should be a callable that takes the current
value of the field as the first argument and any additional arguments that might be required
for the action. It performs an operation on the current value using the provided arguments and
returns the modified value.
provider_options : dict
Optional dictionary representing additional options for the action.
**encryption_context: str
Additional keyword arguments collected into a dictionary.
Returns
-------
dict | str
The modified dictionary or string after applying the action to the
specified fields.
Raises
-------
ValueError
If 'fields' parameter is None.
TypeError
If the 'data' parameter is not a traversable type
Example
-------
```python
>>> data = {'a': {'b': {'c': 1}}, 'x': {'y': 2}}
>>> fields = ['a.b.c', 'a.x.y']
# The function will transform the value at 'a.b.c' (1) and 'a.x.y' (2)
# and store the result as:
new_dict = {'a': {'b': {'c': '*****'}}, 'x': {'y': '*****'}}
```
"""
data_parsed: dict = self._normalize_data_to_parse(fields, data)
# For in-place updates, json_parse accepts a callback function
# this function must receive 3 args: field_value, fields, field_name
# We create a partial callback to pre-populate known options (action, provider opts, enc ctx)
update_callback = functools.partial(
self._call_action,
action=action,
provider_options=provider_options,
**encryption_context,
)
# Iterate over each field to be parsed.
for field_parse in fields:
# Parse the field expression using a 'parse' function.
json_parse = parse(field_parse)
# Find the corresponding keys in the normalized data using the parsed expression.
result_parse = json_parse.find(data_parsed)
if not result_parse:
if self.raise_on_missing_field:
# If the data for the field is not found, raise an exception.
raise DataMaskingFieldNotFoundError(f"Field or expression {field_parse} not found in {data_parsed}")
else:
# If the data for the field is not found, warning.
warnings.warn(f"Field or expression {field_parse} not found in {data_parsed}", stacklevel=2)
# For in-place updates, json_parse accepts a callback function
# that receives 3 args: field_value, fields, field_name
# We create a partial callback to pre-populate known provider options (action, provider opts, enc ctx)
update_callback = functools.partial(
self._call_action,
action=action,
provider_options=provider_options,
**encryption_context,
)
json_parse.update(
data_parsed,
lambda field_value, fields, field_name: update_callback(field_value, fields, field_name), # noqa: B023
)
return data_parsed
@staticmethod
def _call_action(
field_value: Any,
fields: dict[str, Any],
field_name: str,
action: Callable,
provider_options: dict | None = None,
**encryption_context,
) -> None:
"""
Apply a specified action to a field value and update the fields dictionary.
Params:
--------
- field_value: Current value of the field being processed.
- fields: Dictionary representing the fields being processed (mutable).
- field_name: Name of the field being processed.
- action: Callable (function or method) to be applied to the field_value.
- provider_options: Optional dictionary representing additional options for the action.
- **encryption_context: Additional keyword arguments collected into a dictionary.
Returns:
- fields[field_name]: Returns the processed field value
"""
fields[field_name] = action(field_value, provider_options=provider_options, **encryption_context)
return fields[field_name]
def _normalize_data_to_parse(self, fields: list, data: str | dict) -> dict:
if not fields:
raise ValueError("No fields specified.")
if isinstance(data, str):
# Parse JSON string as dictionary
data_parsed = self.json_deserializer(data)
elif isinstance(data, dict):
# Convert the data to a JSON string in case it contains non-string keys (e.g., ints)
# Parse the JSON string back into a dictionary
data_parsed = self.json_deserializer(self.json_serializer(data))
else:
raise DataMaskingUnsupportedTypeError(
f"Unsupported data type. Expected a traversable type (dict or str), but got {type(data)}.",
)
return data_parsed