-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathbase.py
296 lines (247 loc) · 11.2 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
"""
Base for Idempotency utility
!!! abstract "Usage Documentation"
[`Idempotency`](../../utilities/idempotency.md)
"""
from __future__ import annotations
import datetime
import logging
from copy import deepcopy
from typing import TYPE_CHECKING, Any, Callable
from aws_lambda_powertools.utilities.idempotency.exceptions import (
IdempotencyAlreadyInProgressError,
IdempotencyInconsistentStateError,
IdempotencyItemAlreadyExistsError,
IdempotencyItemNotFoundError,
IdempotencyKeyError,
IdempotencyPersistenceLayerError,
IdempotencyValidationError,
)
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
STATUS_CONSTANTS,
DataRecord,
)
from aws_lambda_powertools.utilities.idempotency.serialization.no_op import (
NoOpSerializer,
)
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.idempotency.config import (
IdempotencyConfig,
)
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
BaseIdempotencySerializer,
)
MAX_RETRIES = 2
logger = logging.getLogger(__name__)
def _prepare_data(data: Any) -> Any:
"""Prepare data for json serialization.
We will convert Python dataclasses, pydantic models or event source data classes to a dict,
otherwise return data as-is.
"""
# Convert from dataclasses
if hasattr(data, "__dataclass_fields__"):
import dataclasses
return dataclasses.asdict(data)
# Convert from Pydantic model
if callable(getattr(data, "model_dump", None)):
return data.model_dump()
# Convert from event source data class
if callable(getattr(data, "dict", None)):
return data.dict()
# Return raw event
return getattr(data, "raw_event", data)
class IdempotencyHandler:
"""
Base class to orchestrate calls to persistence layer.
"""
def __init__(
self,
function: Callable,
function_payload: Any,
config: IdempotencyConfig,
persistence_store: BasePersistenceLayer,
output_serializer: BaseIdempotencySerializer | None = None,
key_prefix: str | None = None,
function_args: tuple | None = None,
function_kwargs: dict | None = None,
):
"""
Initialize the IdempotencyHandler
Parameters
----------
function_payload: Any
JSON Serializable payload to be hashed
config: IdempotencyConfig
Idempotency Configuration
persistence_store : BasePersistenceLayer
Instance of persistence layer to store idempotency records
output_serializer: BaseIdempotencySerializer | None
Serializer to transform the data to and from a dictionary.
If not supplied, no serialization is done via the NoOpSerializer
key_prefix: str | Optional
Custom prefix for idempotency key: key_prefix#hash
function_args: tuple | None
Function arguments
function_kwargs: dict | None
Function keyword arguments
"""
self.function = function
self.output_serializer = output_serializer or NoOpSerializer()
self.data = deepcopy(_prepare_data(function_payload))
self.fn_args = function_args
self.fn_kwargs = function_kwargs
self.config = config
self.key_prefix = key_prefix
persistence_store.configure(
config=config,
function_name=f"{self.function.__module__}.{self.function.__qualname__}",
key_prefix=self.key_prefix,
)
self.persistence_store = persistence_store
def handle(self) -> Any:
"""
Main entry point for handling idempotent execution of a function.
Returns
-------
Any
Function response
"""
# IdempotencyInconsistentStateError can happen under rare but expected cases
# when persistent state changes in the small time between put & get requests.
# In most cases we can retry successfully on this exception.
for i in range(MAX_RETRIES + 1): # pragma: no cover
try:
return self._process_idempotency()
except IdempotencyInconsistentStateError:
if i == MAX_RETRIES:
raise # Bubble up when exceeded max tries
def _process_idempotency(self):
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record
# already exists. If it succeeds, there's no need to call get_record.
self.persistence_store.save_inprogress(
data=self.data,
remaining_time_in_millis=self._get_remaining_time_in_millis(),
)
except (IdempotencyKeyError, IdempotencyValidationError):
raise
except IdempotencyItemAlreadyExistsError as exc:
# Attempt to retrieve the existing record, either from the exception ReturnValuesOnConditionCheckFailure
# or perform a GET operation if the information is not available.
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
# way of retrieving the existing record after a failed conditional write operation.
record = exc.old_data_record or self._get_idempotency_record()
# If a record is found, handle it for status
if record:
return self._handle_for_status(record)
except Exception as exc:
raise IdempotencyPersistenceLayerError(
"Failed to save in progress record to idempotency store",
exc,
) from exc
return self._get_function_response()
def _get_remaining_time_in_millis(self) -> int | None:
"""
Tries to determine the remaining time available for the current lambda invocation.
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
However, this could be improved if we start storing the lambda context globally during the invocation. One
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
Returns
-------
int | None
Remaining time in millis, or None if the remaining time cannot be determined.
"""
if self.config.lambda_context is not None:
return self.config.lambda_context.get_remaining_time_in_millis()
return None
def _get_idempotency_record(self) -> DataRecord | None:
"""
Retrieve the idempotency record from the persistence layer.
Raises
----------
IdempotencyInconsistentStateError
"""
try:
data_record = self.persistence_store.get_record(data=self.data)
except IdempotencyItemNotFoundError:
# This code path will only be triggered if the record is removed between save_inprogress and get_record.
logger.debug(
f"An existing idempotency record was deleted before we could fetch it. Proceeding with {self.function}",
)
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
# Allow this exception to bubble up
except IdempotencyValidationError:
raise
# Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
# clients
except Exception as exc:
raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store", exc) from exc
return data_record
def _handle_for_status(self, data_record: DataRecord) -> Any | None:
"""
Take appropriate action based on data_record's status
Parameters
----------
data_record: DataRecord
Returns
-------
Any | None
Function's response previously used for this idempotency key, if it has successfully executed already.
In case an output serializer is configured, the response is deserialized.
Raises
------
AlreadyInProgressError
A function execution is already in progress
IdempotencyInconsistentStateError
The persistence store reports inconsistent states across different requests. Retryable.
"""
# This code path will only be triggered if the record becomes expired between the save_inprogress call and here
if data_record.status == STATUS_CONSTANTS["EXPIRED"]:
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
datetime.datetime.now().timestamp() * 1000,
):
raise IdempotencyInconsistentStateError(
"item should have been expired in-progress because it already time-outed.",
)
inprogress_error_message = (
f"Execution already in progress with idempotency key: "
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
)
if data_record.sort_key is not None:
inprogress_error_message += f" and sort key: {data_record.sort_key}"
raise IdempotencyAlreadyInProgressError(inprogress_error_message)
response_dict = data_record.response_json_as_dict()
serialized_response = self.output_serializer.from_dict(response_dict) if response_dict else None
if self.config.response_hook:
logger.debug("Response hook configured, invoking function")
return self.config.response_hook(serialized_response, data_record)
return serialized_response
def _get_function_response(self):
try:
response = self.function(*self.fn_args, **self.fn_kwargs)
except Exception as handler_exception:
# We need these nested blocks to preserve function's exception in case the persistence store operation
# also raises an exception
try:
self.persistence_store.delete_record(data=self.data, exception=handler_exception)
except Exception as delete_exception:
raise IdempotencyPersistenceLayerError(
"Failed to delete record from idempotency store",
delete_exception,
) from delete_exception
raise
else:
try:
serialized_response: dict = self.output_serializer.to_dict(response) if response else None
self.persistence_store.save_success(data=self.data, result=serialized_response)
except Exception as save_exception:
raise IdempotencyPersistenceLayerError(
"Failed to update record state to success in idempotency store",
save_exception,
) from save_exception
return response