Skip to content

Commit 260fa86

Browse files
authored
feat(idempotency): support for any synchronous function (#625)
1 parent d65f2a7 commit 260fa86

File tree

10 files changed

+507
-261
lines changed

10 files changed

+507
-261
lines changed

aws_lambda_powertools/shared/constants.py

+3
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,8 @@
1616
XRAY_TRACE_ID_ENV: str = "_X_AMZN_TRACE_ID"
1717
LAMBDA_TASK_ROOT_ENV: str = "LAMBDA_TASK_ROOT"
1818

19+
20+
LAMBDA_FUNCTION_NAME_ENV: str = "AWS_LAMBDA_FUNCTION_NAME"
21+
1922
XRAY_SDK_MODULE: str = "aws_xray_sdk"
2023
XRAY_SDK_CORE_MODULE: str = "aws_xray_sdk.core"

aws_lambda_powertools/shared/types.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from typing import Any, Callable, TypeVar
2+
3+
AnyCallableT = TypeVar("AnyCallableT", bound=Callable[..., Any]) # noqa: VNE001

aws_lambda_powertools/tracing/tracer.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
import logging
66
import numbers
77
import os
8-
from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, TypeVar, Union, cast, overload
8+
from typing import Any, Callable, Dict, Optional, Sequence, Union, cast, overload
99

1010
from ..shared import constants
1111
from ..shared.functions import resolve_env_var_choice, resolve_truthy_env_var_choice
1212
from ..shared.lazy_import import LazyLoader
13+
from ..shared.types import AnyCallableT
1314
from .base import BaseProvider, BaseSegment
1415

1516
is_cold_start = True
@@ -18,9 +19,6 @@
1819
aws_xray_sdk = LazyLoader(constants.XRAY_SDK_MODULE, globals(), constants.XRAY_SDK_MODULE)
1920
aws_xray_sdk.core = LazyLoader(constants.XRAY_SDK_CORE_MODULE, globals(), constants.XRAY_SDK_CORE_MODULE)
2021

21-
AnyCallableT = TypeVar("AnyCallableT", bound=Callable[..., Any]) # noqa: VNE001
22-
AnyAwaitableT = TypeVar("AnyAwaitableT", bound=Awaitable)
23-
2422

2523
class Tracer:
2624
"""Tracer using AWS-XRay to provide decorators with known defaults for Lambda functions

aws_lambda_powertools/utilities/idempotency/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55
from aws_lambda_powertools.utilities.idempotency.persistence.base import BasePersistenceLayer
66
from aws_lambda_powertools.utilities.idempotency.persistence.dynamodb import DynamoDBPersistenceLayer
77

8-
from .idempotency import IdempotencyConfig, idempotent
8+
from .idempotency import IdempotencyConfig, idempotent, idempotent_function
99

10-
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "IdempotencyConfig")
10+
__all__ = ("DynamoDBPersistenceLayer", "BasePersistenceLayer", "idempotent", "idempotent_function", "IdempotencyConfig")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
import logging
2+
from typing import Any, Callable, Dict, Optional, Tuple
3+
4+
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
5+
from aws_lambda_powertools.utilities.idempotency.exceptions import (
6+
IdempotencyAlreadyInProgressError,
7+
IdempotencyInconsistentStateError,
8+
IdempotencyItemAlreadyExistsError,
9+
IdempotencyItemNotFoundError,
10+
IdempotencyKeyError,
11+
IdempotencyPersistenceLayerError,
12+
IdempotencyValidationError,
13+
)
14+
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
15+
STATUS_CONSTANTS,
16+
BasePersistenceLayer,
17+
DataRecord,
18+
)
19+
20+
MAX_RETRIES = 2
21+
logger = logging.getLogger(__name__)
22+
23+
24+
class IdempotencyHandler:
25+
"""
26+
Base class to orchestrate calls to persistence layer.
27+
"""
28+
29+
def __init__(
30+
self,
31+
function: Callable,
32+
function_payload: Any,
33+
config: IdempotencyConfig,
34+
persistence_store: BasePersistenceLayer,
35+
function_args: Optional[Tuple] = None,
36+
function_kwargs: Optional[Dict] = None,
37+
):
38+
"""
39+
Initialize the IdempotencyHandler
40+
41+
Parameters
42+
----------
43+
function_payload: Any
44+
JSON Serializable payload to be hashed
45+
config: IdempotencyConfig
46+
Idempotency Configuration
47+
persistence_store : BasePersistenceLayer
48+
Instance of persistence layer to store idempotency records
49+
function_args: Optional[Tuple]
50+
Function arguments
51+
function_kwargs: Optional[Dict]
52+
Function keyword arguments
53+
"""
54+
self.function = function
55+
self.data = function_payload
56+
self.fn_args = function_args
57+
self.fn_kwargs = function_kwargs
58+
59+
persistence_store.configure(config)
60+
self.persistence_store = persistence_store
61+
62+
def handle(self) -> Any:
63+
"""
64+
Main entry point for handling idempotent execution of a function.
65+
66+
Returns
67+
-------
68+
Any
69+
Function response
70+
71+
"""
72+
# IdempotencyInconsistentStateError can happen under rare but expected cases
73+
# when persistent state changes in the small time between put & get requests.
74+
# In most cases we can retry successfully on this exception.
75+
for i in range(MAX_RETRIES + 1): # pragma: no cover
76+
try:
77+
return self._process_idempotency()
78+
except IdempotencyInconsistentStateError:
79+
if i == MAX_RETRIES:
80+
raise # Bubble up when exceeded max tries
81+
82+
def _process_idempotency(self):
83+
try:
84+
# We call save_inprogress first as an optimization for the most common case where no idempotent record
85+
# already exists. If it succeeds, there's no need to call get_record.
86+
self.persistence_store.save_inprogress(data=self.data)
87+
except IdempotencyKeyError:
88+
raise
89+
except IdempotencyItemAlreadyExistsError:
90+
# Now we know the item already exists, we can retrieve it
91+
record = self._get_idempotency_record()
92+
return self._handle_for_status(record)
93+
except Exception as exc:
94+
raise IdempotencyPersistenceLayerError("Failed to save in progress record to idempotency store") from exc
95+
96+
return self._get_function_response()
97+
98+
def _get_idempotency_record(self) -> DataRecord:
99+
"""
100+
Retrieve the idempotency record from the persistence layer.
101+
102+
Raises
103+
----------
104+
IdempotencyInconsistentStateError
105+
106+
"""
107+
try:
108+
data_record = self.persistence_store.get_record(data=self.data)
109+
except IdempotencyItemNotFoundError:
110+
# This code path will only be triggered if the record is removed between save_inprogress and get_record.
111+
logger.debug(
112+
f"An existing idempotency record was deleted before we could fetch it. Proceeding with {self.function}"
113+
)
114+
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
115+
116+
# Allow this exception to bubble up
117+
except IdempotencyValidationError:
118+
raise
119+
120+
# Wrap remaining unhandled exceptions with IdempotencyPersistenceLayerError to ease exception handling for
121+
# clients
122+
except Exception as exc:
123+
raise IdempotencyPersistenceLayerError("Failed to get record from idempotency store") from exc
124+
125+
return data_record
126+
127+
def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]]:
128+
"""
129+
Take appropriate action based on data_record's status
130+
131+
Parameters
132+
----------
133+
data_record: DataRecord
134+
135+
Returns
136+
-------
137+
Optional[Dict[Any, Any]
138+
Function's response previously used for this idempotency key, if it has successfully executed already.
139+
140+
Raises
141+
------
142+
AlreadyInProgressError
143+
A function execution is already in progress
144+
IdempotencyInconsistentStateError
145+
The persistence store reports inconsistent states across different requests. Retryable.
146+
"""
147+
# This code path will only be triggered if the record becomes expired between the save_inprogress call and here
148+
if data_record.status == STATUS_CONSTANTS["EXPIRED"]:
149+
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
150+
151+
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
152+
raise IdempotencyAlreadyInProgressError(
153+
f"Execution already in progress with idempotency key: "
154+
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"
155+
)
156+
157+
return data_record.response_json_as_dict()
158+
159+
def _get_function_response(self):
160+
try:
161+
response = self.function(*self.fn_args, **self.fn_kwargs)
162+
except Exception as handler_exception:
163+
# We need these nested blocks to preserve function's exception in case the persistence store operation
164+
# also raises an exception
165+
try:
166+
self.persistence_store.delete_record(data=self.data, exception=handler_exception)
167+
except Exception as delete_exception:
168+
raise IdempotencyPersistenceLayerError(
169+
"Failed to delete record from idempotency store"
170+
) from delete_exception
171+
raise
172+
173+
else:
174+
try:
175+
self.persistence_store.save_success(data=self.data, result=response)
176+
except Exception as save_exception:
177+
raise IdempotencyPersistenceLayerError(
178+
"Failed to update record state to success in idempotency store"
179+
) from save_exception
180+
181+
return response

0 commit comments

Comments
 (0)