-
Notifications
You must be signed in to change notification settings - Fork 421
/
Copy pathidempotency.py
209 lines (178 loc) · 7.28 KB
/
idempotency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
Primary interface for idempotent Lambda functions utility
"""
from __future__ import annotations
import functools
import logging
import os
import warnings
from inspect import isclass
from typing import TYPE_CHECKING, Any, Callable, cast
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.functions import strtobool
from aws_lambda_powertools.shared.types import AnyCallableT
from aws_lambda_powertools.utilities.idempotency.base import IdempotencyHandler
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
BaseIdempotencyModelSerializer,
BaseIdempotencySerializer,
)
if TYPE_CHECKING:
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
BasePersistenceLayer,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.warnings import PowertoolsUserWarning
logger = logging.getLogger(__name__)
@lambda_handler_decorator
def idempotent(
handler: Callable[[Any, LambdaContext], Any],
event: dict[str, Any],
context: LambdaContext,
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig | None = None,
key_prefix: str | None = None,
**kwargs,
) -> Any:
"""
Decorator to handle idempotency
Parameters
----------
handler: Callable
Lambda's handler
event: dict
Lambda's Event
context: dict
Lambda's Context
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config: IdempotencyConfig
Configuration
key_prefix: str | Optional
Custom prefix for idempotency key: key_prefix#hash
Examples
--------
**Processes Lambda's event in an idempotent manner**
>>> from aws_lambda_powertools.utilities.idempotency import (
>>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
>>> )
>>>
>>> idem_config=IdempotencyConfig(event_key_jmespath="body")
>>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(config=idem_config, persistence_store=persistence_layer)
>>> def handler(event, context):
>>> return {"StatusCode": 200}
"""
# Skip idempotency controls when POWERTOOLS_IDEMPOTENCY_DISABLED has a truthy value
# Raises a warning if not running in development mode
if strtobool(os.getenv(constants.IDEMPOTENCY_DISABLED_ENV, "false")):
warnings.warn(
message="Disabling idempotency is intended for development environments only "
"and should not be used in production.",
category=PowertoolsUserWarning,
stacklevel=2,
)
return handler(event, context, **kwargs)
config = config or IdempotencyConfig()
config.register_lambda_context(context)
args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
function_payload=event,
config=config,
persistence_store=persistence_store,
key_prefix=key_prefix,
function_args=args,
function_kwargs=kwargs,
)
return idempotency_handler.handle()
def idempotent_function(
function: AnyCallableT | None = None,
*,
data_keyword_argument: str,
persistence_store: BasePersistenceLayer,
config: IdempotencyConfig | None = None,
output_serializer: BaseIdempotencySerializer | type[BaseIdempotencyModelSerializer] | None = None,
key_prefix: str | None = None,
**kwargs: Any,
) -> Any:
"""
Decorator to handle idempotency of any function
Parameters
----------
function: Callable
Function to be decorated
data_keyword_argument: str
Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config: IdempotencyConfig
Configuration
output_serializer: BaseIdempotencySerializer | type[BaseIdempotencyModelSerializer] | None
Serializer to transform the data to and from a dictionary.
If not supplied, no serialization is done via the NoOpSerializer.
In case a serializer of type inheriting BaseIdempotencyModelSerializer is given,
the serializer is derived from the function return type.
key_prefix: str | Optional
Custom prefix for idempotency key: key_prefix#hash
Examples
--------
**Processes an order in an idempotent manner**
from aws_lambda_powertools.utilities.idempotency import (
idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
)
idem_config=IdempotencyConfig(event_key_jmespath="order_id")
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
@idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer)
def process_order(customer_id: str, order: dict, **kwargs):
return {"StatusCode": 200}
"""
if not function:
return cast(
AnyCallableT,
functools.partial(
idempotent_function,
data_keyword_argument=data_keyword_argument,
persistence_store=persistence_store,
config=config,
output_serializer=output_serializer,
key_prefix=key_prefix,
**kwargs,
),
)
if isclass(output_serializer) and issubclass(output_serializer, BaseIdempotencyModelSerializer):
# instantiate an instance of the serializer class
output_serializer = output_serializer.instantiate(function.__annotations__.get("return", None))
config = config or IdempotencyConfig()
@functools.wraps(function)
def decorate(*args, **kwargs):
# Skip idempotency controls when POWERTOOLS_IDEMPOTENCY_DISABLED has a truthy value
# Raises a warning if not running in development mode
if strtobool(os.getenv(constants.IDEMPOTENCY_DISABLED_ENV, "false")):
warnings.warn(
message="Disabling idempotency is intended for development environments only "
"and should not be used in production.",
category=PowertoolsUserWarning,
stacklevel=2,
)
return function(*args, **kwargs)
if data_keyword_argument not in kwargs:
raise RuntimeError(
f"Unable to extract '{data_keyword_argument}' from keyword arguments."
f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument",
)
payload = kwargs.get(data_keyword_argument)
idempotency_handler = IdempotencyHandler(
function=function,
function_payload=payload,
config=config,
persistence_store=persistence_store,
output_serializer=output_serializer,
key_prefix=key_prefix,
function_args=args,
function_kwargs=kwargs,
)
return idempotency_handler.handle()
return cast(AnyCallableT, decorate)