Skip to content

Commit 03f7dcd

Browse files
Tom McCarthymichaelbrewer
Tom McCarthy
andauthored
feat: Idempotency helper utility (#245)
* feat: initial commit for idempotency utility * fix: ensure region is configured in botocore for tests * chore: ignore security warning for md5 usage * chore: add debug logging * feat: add local caching for idempotency lookups * feat: replace simple dict cache with LRU * feat: remove idempotent exception handling * feat: remove unused logic to create ddb table - will handle in documentation instead * fix: remove redundant code from table creation logic * chore: move tests to own dir * chore: remove redundant code for exception handling * feat: add payload validation logic and functionality to use different hash functions from hashlib * feat: optimization to reduce number of database calls, reorganize persistence layer modules * chore: type corrections * chore: add more logging statements * fix: Use variable for ddb attribute name * chore: clarify docstring for abstract method * feat: Refactor to cover corner cases where state changes between calls to db * chore: correct stubbed ddb responses for test case * docs: add first of a few seq diagrams to support documentation * feat: use boto3 session for constructing clients to allow customization of credentials * chore: move cache dict implementation to shared dir * chore: refactor with improvements for readability, variable names, and exception handling * chore: remove dead code, rename variable for clarity, change args to kwargs in function call * chore: improve test coverage, refactor fixtures * chore: skip tests using pytest-mock's spy for python < 3.8 due to issues with lib * chore: update test fixtures to use jmespath * docs: first draft of docs for idempotency util * fix: Allow event_key_jmespath to be left empty to use entire event as payload * docs: add section for compatibility with other utils * chore: improvements to func tests * chore: add unit tests for lru cache * feat: add support for decimals in json serializer * chore: Add docstring for LRU cache dict * chore: Remove unused status constants * chore: Rename method for clarity * chore: Correct example in docstring * fix: make data attribute of data record optional in get_record so we don't throw the wrong error for INPROGRESS * docs: clarify behaviour for concurrent executions and DDB behaviour for large items * Update aws_lambda_powertools/shared/cache_dict.py Co-authored-by: Michael Brewer <[email protected]> * Update aws_lambda_powertools/shared/cache_dict.py Co-authored-by: Michael Brewer <[email protected]> * Update aws_lambda_powertools/utilities/idempotency/persistence/base.py Co-authored-by: Michael Brewer <[email protected]> * chore: add test for invalid status on data record * Update aws_lambda_powertools/utilities/idempotency/persistence/base.py Co-authored-by: Michael Brewer <[email protected]> Co-authored-by: Michael Brewer <[email protected]>
1 parent b5ada70 commit 03f7dcd

21 files changed

+2183
-1
lines changed
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from collections import OrderedDict
2+
3+
4+
class LRUDict(OrderedDict):
5+
"""
6+
Cache implementation based on ordered dict with a maximum number of items. Last accessed item will be evicted
7+
first. Currently used by idempotency utility.
8+
"""
9+
10+
def __init__(self, max_items=1024, *args, **kwargs):
11+
self.max_items = max_items
12+
super().__init__(*args, **kwargs)
13+
14+
def __getitem__(self, key):
15+
value = super().__getitem__(key)
16+
self.move_to_end(key)
17+
return value
18+
19+
def __setitem__(self, key, value):
20+
if key in self:
21+
self.move_to_end(key)
22+
super().__setitem__(key, value)
23+
if len(self) > self.max_items:
24+
oldest = next(iter(self))
25+
del self[oldest]
26+
27+
def get(self, key, *args, **kwargs):
28+
item = super(LRUDict, self).get(key, *args, **kwargs)
29+
if item:
30+
self.move_to_end(key=key)
31+
return item
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import decimal
2+
import json
3+
import math
4+
5+
6+
class Encoder(json.JSONEncoder):
7+
"""
8+
Custom JSON encoder to allow for serialization of Decimals, similar to the serializer used by Lambda internally.
9+
"""
10+
11+
def default(self, obj):
12+
if isinstance(obj, decimal.Decimal):
13+
if obj.is_nan():
14+
return math.nan
15+
return str(obj)
16+
return super().default(obj)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""
2+
Utility for adding idempotency to lambda functions
3+
"""
4+
5+
from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
6+
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer
7+
8+
from .idempotency import idempotent
9+
10+
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Idempotency errors
3+
"""
4+
5+
6+
class IdempotencyItemAlreadyExistsError(Exception):
7+
"""
8+
Item attempting to be inserted into persistence store already exists and is not expired
9+
"""
10+
11+
12+
class IdempotencyItemNotFoundError(Exception):
13+
"""
14+
Item does not exist in persistence store
15+
"""
16+
17+
18+
class IdempotencyAlreadyInProgressError(Exception):
19+
"""
20+
Execution with idempotency key is already in progress
21+
"""
22+
23+
24+
class IdempotencyInvalidStatusError(Exception):
25+
"""
26+
An invalid status was provided
27+
"""
28+
29+
30+
class IdempotencyValidationError(Exception):
31+
"""
32+
Payload does not match stored idempotency record
33+
"""
34+
35+
36+
class IdempotencyInconsistentStateError(Exception):
37+
"""
38+
State is inconsistent across multiple requests to persistence store
39+
"""
40+
41+
42+
class IdempotencyPersistenceLayerError(Exception):
43+
"""
44+
Unrecoverable error from the data store
45+
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
"""
2+
Primary interface for idempotent Lambda functions utility
3+
"""
4+
import logging
5+
from typing import Any, Callable, Dict, Optional
6+
7+
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
8+
from aws_lambda_powertools.utilities.idempotency.exceptions import (
9+
IdempotencyAlreadyInProgressError,
10+
IdempotencyInconsistentStateError,
11+
IdempotencyItemAlreadyExistsError,
12+
IdempotencyItemNotFoundError,
13+
IdempotencyPersistenceLayerError,
14+
IdempotencyValidationError,
15+
)
16+
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
17+
STATUS_CONSTANTS,
18+
BasePersistenceLayer,
19+
DataRecord,
20+
)
21+
from aws_lambda_powertools.utilities.typing import LambdaContext
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
@lambda_handler_decorator
27+
def idempotent(
28+
handler: Callable[[Any, LambdaContext], Any],
29+
event: Dict[str, Any],
30+
context: LambdaContext,
31+
persistence_store: BasePersistenceLayer,
32+
) -> Any:
33+
"""
34+
Middleware to handle idempotency
35+
36+
Parameters
37+
----------
38+
handler: Callable
39+
Lambda's handler
40+
event: Dict
41+
Lambda's Event
42+
context: Dict
43+
Lambda's Context
44+
persistence_store: BasePersistenceLayer
45+
Instance of BasePersistenceLayer to store data
46+
47+
Examples
48+
--------
49+
**Processes Lambda's event in an idempotent manner**
50+
>>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer
51+
>>>
52+
>>> persistence_store = DynamoDBPersistenceLayer(event_key_jmespath="body", table_name="idempotency_store")
53+
>>>
54+
>>> @idempotent(persistence_store=persistence_store)
55+
>>> def handler(event, context):
56+
>>> return {"StatusCode": 200}
57+
"""
58+
59+
idempotency_handler = IdempotencyHandler(handler, event, context, persistence_store)
60+
61+
# IdempotencyInconsistentStateError can happen under rare but expected cases when persistent state changes in the
62+
# small time between put & get requests. In most cases we can retry successfully on this exception.
63+
max_handler_retries = 2
64+
for i in range(max_handler_retries + 1):
65+
try:
66+
return idempotency_handler.handle()
67+
except IdempotencyInconsistentStateError:
68+
if i < max_handler_retries:
69+
continue
70+
else:
71+
# Allow the exception to bubble up after max retries exceeded
72+
raise
73+
74+
75+
class IdempotencyHandler:
76+
"""
77+
Class to orchestrate calls to persistence layer.
78+
"""
79+
80+
def __init__(
81+
self,
82+
lambda_handler: Callable[[Any, LambdaContext], Any],
83+
event: Dict[str, Any],
84+
context: LambdaContext,
85+
persistence_store: BasePersistenceLayer,
86+
):
87+
"""
88+
Initialize the IdempotencyHandler
89+
90+
Parameters
91+
----------
92+
lambda_handler : Callable[[Any, LambdaContext], Any]
93+
Lambda function handler
94+
event : Dict[str, Any]
95+
Event payload lambda handler will be called with
96+
context : LambdaContext
97+
Context object which will be passed to lambda handler
98+
persistence_store : BasePersistenceLayer
99+
Instance of persistence layer to store idempotency records
100+
"""
101+
self.persistence_store = persistence_store
102+
self.context = context
103+
self.event = event
104+
self.lambda_handler = lambda_handler
105+
self.max_handler_retries = 2
106+
107+
def handle(self) -> Any:
108+
"""
109+
Main entry point for handling idempotent execution of lambda handler.
110+
111+
Returns
112+
-------
113+
Any
114+
lambda handler response
115+
116+
"""
117+
try:
118+
# We call save_inprogress first as an optimization for the most common case where no idempotent record
119+
# already exists. If it succeeds, there's no need to call get_record.
120+
self.persistence_store.save_inprogress(event=self.event)
121+
except IdempotencyItemAlreadyExistsError:
122+
# Now we know the item already exists, we can retrieve it
123+
record = self._get_idempotency_record()
124+
return self._handle_for_status(record)
125+
126+
return self._call_lambda_handler()
127+
128+
def _get_idempotency_record(self) -> DataRecord:
129+
"""
130+
Retrieve the idempotency record from the persistence layer.
131+
132+
Raises
133+
----------
134+
IdempotencyInconsistentStateError
135+
136+
"""
137+
try:
138+
event_record = self.persistence_store.get_record(self.event)
139+
except IdempotencyItemNotFoundError:
140+
# This code path will only be triggered if the record is removed between save_inprogress and get_record.
141+
logger.debug(
142+
"An existing idempotency record was deleted before we could retrieve it. Proceeding with lambda "
143+
"handler"
144+
)
145+
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
146+
147+
# Allow this exception to bubble up
148+
except IdempotencyValidationError:
149+
raise
150+
151+
# Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
152+
# clients
153+
except Exception as exc:
154+
raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc
155+
156+
return event_record
157+
158+
def _handle_for_status(self, event_record: DataRecord) -> Optional[Dict[Any, Any]]:
159+
"""
160+
Take appropriate action based on event_record's status
161+
162+
Parameters
163+
----------
164+
event_record: DataRecord
165+
166+
Returns
167+
-------
168+
Optional[Dict[Any, Any]
169+
Lambda response previously used for this idempotency key, if it has successfully executed already.
170+
171+
Raises
172+
------
173+
AlreadyInProgressError
174+
A lambda execution is already in progress
175+
IdempotencyInconsistentStateError
176+
The persistence store reports inconsistent states across different requests. Retryable.
177+
"""
178+
# This code path will only be triggered if the record becomes expired between the save_inprogress call and here
179+
if event_record.status == STATUS_CONSTANTS["EXPIRED"]:
180+
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
181+
182+
if event_record.status == STATUS_CONSTANTS["INPROGRESS"]:
183+
raise IdempotencyAlreadyInProgressError(
184+
f"Execution already in progress with idempotency key: "
185+
f"{self.persistence_store.event_key_jmespath}={event_record.idempotency_key}"
186+
)
187+
188+
return event_record.response_json_as_dict()
189+
190+
def _call_lambda_handler(self) -> Any:
191+
"""
192+
Call the lambda handler function and update the persistence store appropriate depending on the output
193+
194+
Returns
195+
-------
196+
Any
197+
lambda handler response
198+
199+
"""
200+
try:
201+
handler_response = self.lambda_handler(self.event, self.context)
202+
except Exception as handler_exception:
203+
# We need these nested blocks to preserve lambda handler exception in case the persistence store operation
204+
# also raises an exception
205+
try:
206+
self.persistence_store.delete_record(event=self.event, exception=handler_exception)
207+
except Exception as delete_exception:
208+
raise IdempotencyPersistenceLayerError(
209+
"Failed to delete record from idempotency store"
210+
) from delete_exception
211+
raise
212+
213+
else:
214+
try:
215+
self.persistence_store.save_success(event=self.event, result=handler_response)
216+
except Exception as save_exception:
217+
raise IdempotencyPersistenceLayerError(
218+
"Failed to update record state to success in idempotency store"
219+
) from save_exception
220+
221+
return handler_response

aws_lambda_powertools/utilities/idempotency/persistence/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)