-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathidempotency.py
101 lines (84 loc) · 3.28 KB
/
idempotency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""
Primary interface for idempotent Lambda functions utility
"""
import logging
from typing import Any, Callable, Dict
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, BasePersistenceLayer
from ..typing import LambdaContext
from .exceptions import AlreadyInProgressError, ItemAlreadyExistsError, ItemNotFoundError
logger = logging.getLogger(__name__)
def default_error_callback():
raise
@lambda_handler_decorator
def idempotent(
handler: Callable[[Any, LambdaContext], Any],
event: Dict[str, Any],
context: LambdaContext,
persistence_store: BasePersistenceLayer,
) -> Any:
"""
Middleware to handle idempotency
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
Examples
--------
**Processes Lambda's event in an idempotent manner**
>>> from aws_lambda_powertools.utilities.idempotency import idempotent, DynamoDBPersistenceLayer
>>>
>>> persistence_store = DynamoDBPersistenceLayer(event_key="body", table_name="idempotency_store")
>>>
>>> @idempotent(persistence_store=persistence_store)
>>> def handler(event, context):
>>> return {"StatusCode": 200}
"""
try:
# We call save_inprogress first as an optimization for the most common case where no idempotent record already
# exists. If it succeeds, there's no need to call get_record.
persistence_store.save_inprogress(event=event)
except ItemAlreadyExistsError:
try:
event_record = persistence_store.get_record(event)
except ItemNotFoundError:
return _call_lambda(handler=handler, persistence_store=persistence_store, event=event, context=context)
if event_record.status == STATUS_CONSTANTS["EXPIRED"]:
return _call_lambda(handler=handler, persistence_store=persistence_store, event=event, context=context)
if event_record.status == STATUS_CONSTANTS["INPROGRESS"]:
raise AlreadyInProgressError(
f"Execution already in progress with idempotency key: "
f"{persistence_store.event_key}={event_record.idempotency_key}"
)
if event_record.status == STATUS_CONSTANTS["COMPLETED"]:
return event_record.response_json_as_dict()
return _call_lambda(handler=handler, persistence_store=persistence_store, event=event, context=context)
def _call_lambda(
handler: Callable, persistence_store: BasePersistenceLayer, event: Dict[str, Any], context: LambdaContext
) -> Any:
"""
Parameters
----------
handler: Callable
Lambda handler
persistence_store: BasePersistenceLayer
Instance of persistence layer
event
Lambda event
context
Lambda context
"""
try:
handler_response = handler(event, context)
except Exception as ex:
persistence_store.save_error(event=event, exception=ex)
raise
else:
persistence_store.save_success(event=event, result=handler_response)
return handler_response