forked from aws-powertools/powertools-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathidempotency.py
152 lines (127 loc) · 4.75 KB
/
idempotency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
Primary interface for idempotent Lambda functions utility
"""
import functools
import logging
import os
from typing import Any, Callable, Dict, Optional, cast
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
from aws_lambda_powertools.shared import constants
from aws_lambda_powertools.shared.types import AnyCallableT
from aws_lambda_powertools.utilities.idempotency.base import IdempotencyHandler
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
from aws_lambda_powertools.utilities.typing import LambdaContext
logger = logging.getLogger(__name__)
@lambda_handler_decorator
def idempotent(
handler: Callable[[Any, LambdaContext], Any],
event: Dict[str, Any],
context: LambdaContext,
persistence_store: BasePersistenceLayer,
config: Optional[IdempotencyConfig] = None,
**kwargs,
) -> Any:
"""
Decorator to handle idempotency
Parameters
----------
handler: Callable
Lambda's handler
event: Dict
Lambda's Event
context: Dict
Lambda's Context
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config: IdempotencyConfig
Configuration
Examples
--------
**Processes Lambda's event in an idempotent manner**
>>> from aws_lambda_powertools.utilities.idempotency import (
>>> idempotent, DynamoDBPersistenceLayer, IdempotencyConfig
>>> )
>>>
>>> idem_config=IdempotencyConfig(event_key_jmespath="body")
>>> persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
>>>
>>> @idempotent(config=idem_config, persistence_store=persistence_layer)
>>> def handler(event, context):
>>> return {"StatusCode": 200}
"""
if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV):
return handler(event, context)
config = config or IdempotencyConfig()
args = event, context
idempotency_handler = IdempotencyHandler(
function=handler,
function_payload=event,
config=config,
persistence_store=persistence_store,
function_args=args,
function_kwargs=kwargs,
)
return idempotency_handler.handle()
def idempotent_function(
function: Optional[AnyCallableT] = None,
*,
data_keyword_argument: str,
persistence_store: BasePersistenceLayer,
config: Optional[IdempotencyConfig] = None,
) -> Any:
"""
Decorator to handle idempotency of any function
Parameters
----------
function: Callable
Function to be decorated
data_keyword_argument: str
Keyword parameter name in function's signature that we should hash as idempotency key, e.g. "order"
persistence_store: BasePersistenceLayer
Instance of BasePersistenceLayer to store data
config: IdempotencyConfig
Configuration
Examples
--------
**Processes an order in an idempotent manner**
from aws_lambda_powertools.utilities.idempotency import (
idempotent_function, DynamoDBPersistenceLayer, IdempotencyConfig
)
idem_config=IdempotencyConfig(event_key_jmespath="order_id")
persistence_layer = DynamoDBPersistenceLayer(table_name="idempotency_store")
@idempotent_function(data_keyword_argument="order", config=idem_config, persistence_store=persistence_layer)
def process_order(customer_id: str, order: dict, **kwargs):
return {"StatusCode": 200}
"""
if function is None:
return cast(
AnyCallableT,
functools.partial(
idempotent_function,
data_keyword_argument=data_keyword_argument,
persistence_store=persistence_store,
config=config,
),
)
config = config or IdempotencyConfig()
@functools.wraps(function)
def decorate(*args, **kwargs):
if os.getenv(constants.IDEMPOTENCY_DISABLED_ENV):
return function(*args, **kwargs)
payload = kwargs.get(data_keyword_argument)
if payload is None:
raise RuntimeError(
f"Unable to extract '{data_keyword_argument}' from keyword arguments."
f" Ensure this exists in your function's signature as well as the caller used it as a keyword argument"
)
idempotency_handler = IdempotencyHandler(
function=function,
function_payload=payload,
config=config,
persistence_store=persistence_store,
function_args=args,
function_kwargs=kwargs,
)
return idempotency_handler.handle()
return cast(AnyCallableT, decorate)