Skip to content

Commit 160feae

Browse files
feat(idempotency): handle lambda timeout scenarios for INPROGRESS records (#1387)
Co-authored-by: heitorlessa <[email protected]>
1 parent 02e8b60 commit 160feae

File tree

12 files changed

+561
-121
lines changed

12 files changed

+561
-121
lines changed

Diff for: aws_lambda_powertools/utilities/idempotency/base.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import logging
23
from copy import deepcopy
34
from typing import Any, Callable, Dict, Optional, Tuple
@@ -73,6 +74,7 @@ def __init__(
7374
self.data = deepcopy(_prepare_data(function_payload))
7475
self.fn_args = function_args
7576
self.fn_kwargs = function_kwargs
77+
self.config = config
7678

7779
persistence_store.configure(config, self.function.__name__)
7880
self.persistence_store = persistence_store
@@ -101,7 +103,9 @@ def _process_idempotency(self):
101103
try:
102104
# We call save_inprogress first as an optimization for the most common case where no idempotent record
103105
# already exists. If it succeeds, there's no need to call get_record.
104-
self.persistence_store.save_inprogress(data=self.data)
106+
self.persistence_store.save_inprogress(
107+
data=self.data, remaining_time_in_millis=self._get_remaining_time_in_millis()
108+
)
105109
except IdempotencyKeyError:
106110
raise
107111
except IdempotencyItemAlreadyExistsError:
@@ -113,6 +117,25 @@ def _process_idempotency(self):
113117

114118
return self._get_function_response()
115119

120+
def _get_remaining_time_in_millis(self) -> Optional[int]:
121+
"""
122+
Tries to determine the remaining time available for the current lambda invocation.
123+
124+
This only works if the idempotent handler decorator is used, since we need to access the lambda context.
125+
However, this could be improved if we start storing the lambda context globally during the invocation. One
126+
way to do this is to register the lambda context when configuring the IdempotencyConfig object.
127+
128+
Returns
129+
-------
130+
Optional[int]
131+
Remaining time in millis, or None if the remaining time cannot be determined.
132+
"""
133+
134+
if self.config.lambda_context is not None:
135+
return self.config.lambda_context.get_remaining_time_in_millis()
136+
137+
return None
138+
116139
def _get_idempotency_record(self) -> DataRecord:
117140
"""
118141
Retrieve the idempotency record from the persistence layer.
@@ -167,6 +190,13 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
167190
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
168191

169192
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
193+
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
194+
datetime.datetime.now().timestamp() * 1000
195+
):
196+
raise IdempotencyInconsistentStateError(
197+
"item should have been expired in-progress because it already time-outed."
198+
)
199+
170200
raise IdempotencyAlreadyInProgressError(
171201
f"Execution already in progress with idempotency key: "
172202
f"{self.persistence_store.event_key_jmespath}={data_record.idempotency_key}"

Diff for: aws_lambda_powertools/utilities/idempotency/config.py

+10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import Dict, Optional
22

3+
from aws_lambda_powertools.utilities.typing import LambdaContext
4+
35

46
class IdempotencyConfig:
57
def __init__(
@@ -12,6 +14,7 @@ def __init__(
1214
use_local_cache: bool = False,
1315
local_cache_max_items: int = 256,
1416
hash_function: str = "md5",
17+
lambda_context: Optional[LambdaContext] = None,
1518
):
1619
"""
1720
Initialize the base persistence layer
@@ -32,6 +35,8 @@ def __init__(
3235
Max number of items to store in local cache, by default 1024
3336
hash_function: str, optional
3437
Function to use for calculating hashes, by default md5.
38+
lambda_context: LambdaContext, optional
39+
Lambda Context containing information about the invocation, function and execution environment.
3540
"""
3641
self.event_key_jmespath = event_key_jmespath
3742
self.payload_validation_jmespath = payload_validation_jmespath
@@ -41,3 +46,8 @@ def __init__(
4146
self.use_local_cache = use_local_cache
4247
self.local_cache_max_items = local_cache_max_items
4348
self.hash_function = hash_function
49+
self.lambda_context: Optional[LambdaContext] = lambda_context
50+
51+
def register_lambda_context(self, lambda_context: LambdaContext):
52+
"""Captures the Lambda context, to calculate the remaining time before the invocation times out"""
53+
self.lambda_context = lambda_context

Diff for: aws_lambda_powertools/utilities/idempotency/idempotency.py

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ def idempotent(
6262
return handler(event, context)
6363

6464
config = config or IdempotencyConfig()
65+
config.register_lambda_context(context)
66+
6567
args = event, context
6668
idempotency_handler = IdempotencyHandler(
6769
function=handler,

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/base.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def __init__(
4040
idempotency_key,
4141
status: str = "",
4242
expiry_timestamp: Optional[int] = None,
43+
in_progress_expiry_timestamp: Optional[int] = None,
4344
response_data: Optional[str] = "",
4445
payload_hash: Optional[str] = None,
4546
) -> None:
@@ -53,6 +54,8 @@ def __init__(
5354
status of the idempotent record
5455
expiry_timestamp: int, optional
5556
time before the record should expire, in seconds
57+
in_progress_expiry_timestamp: int, optional
58+
time before the record should expire while in the INPROGRESS state, in seconds
5659
payload_hash: str, optional
5760
hashed representation of payload
5861
response_data: str, optional
@@ -61,6 +64,7 @@ def __init__(
6164
self.idempotency_key = idempotency_key
6265
self.payload_hash = payload_hash
6366
self.expiry_timestamp = expiry_timestamp
67+
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
6468
self._status = status
6569
self.response_data = response_data
6670

@@ -328,14 +332,16 @@ def save_success(self, data: Dict[str, Any], result: dict) -> None:
328332

329333
self._save_to_cache(data_record=data_record)
330334

331-
def save_inprogress(self, data: Dict[str, Any]) -> None:
335+
def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Optional[int] = None) -> None:
332336
"""
333337
Save record of function's execution being in progress
334338
335339
Parameters
336340
----------
337341
data: Dict[str, Any]
338342
Payload
343+
remaining_time_in_millis: Optional[int]
344+
If expiry of in-progress invocations is enabled, this will contain the remaining time available in millis
339345
"""
340346
data_record = DataRecord(
341347
idempotency_key=self._get_hashed_idempotency_key(data=data),
@@ -344,6 +350,18 @@ def save_inprogress(self, data: Dict[str, Any]) -> None:
344350
payload_hash=self._get_hashed_payload(data=data),
345351
)
346352

353+
if remaining_time_in_millis:
354+
now = datetime.datetime.now()
355+
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
356+
timestamp = (now + period).timestamp()
357+
358+
data_record.in_progress_expiry_timestamp = int(timestamp * 1000)
359+
else:
360+
warnings.warn(
361+
"Couldn't determine the remaining time left. "
362+
"Did you call register_lambda_context on IdempotencyConfig?"
363+
)
364+
347365
logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
348366

349367
if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py

+53-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
IdempotencyItemAlreadyExistsError,
1313
IdempotencyItemNotFoundError,
1414
)
15-
from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord
15+
from aws_lambda_powertools.utilities.idempotency.persistence.base import STATUS_CONSTANTS, DataRecord
1616

1717
logger = logging.getLogger(__name__)
1818

@@ -25,6 +25,7 @@ def __init__(
2525
static_pk_value: Optional[str] = None,
2626
sort_key_attr: Optional[str] = None,
2727
expiry_attr: str = "expiration",
28+
in_progress_expiry_attr: str = "in_progress_expiration",
2829
status_attr: str = "status",
2930
data_attr: str = "data",
3031
validation_key_attr: str = "validation",
@@ -47,6 +48,8 @@ def __init__(
4748
DynamoDB attribute name for the sort key
4849
expiry_attr: str, optional
4950
DynamoDB attribute name for expiry timestamp, by default "expiration"
51+
in_progress_expiry_attr: str, optional
52+
DynamoDB attribute name for in-progress expiry timestamp, by default "in_progress_expiration"
5053
status_attr: str, optional
5154
DynamoDB attribute name for status, by default "status"
5255
data_attr: str, optional
@@ -85,6 +88,7 @@ def __init__(
8588
self.static_pk_value = static_pk_value
8689
self.sort_key_attr = sort_key_attr
8790
self.expiry_attr = expiry_attr
91+
self.in_progress_expiry_attr = in_progress_expiry_attr
8892
self.status_attr = status_attr
8993
self.data_attr = data_attr
9094
self.validation_key_attr = validation_key_attr
@@ -133,6 +137,7 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
133137
idempotency_key=item[self.key_attr],
134138
status=item[self.status_attr],
135139
expiry_timestamp=item[self.expiry_attr],
140+
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
136141
response_data=item.get(self.data_attr),
137142
payload_hash=item.get(self.validation_key_attr),
138143
)
@@ -153,33 +158,75 @@ def _put_record(self, data_record: DataRecord) -> None:
153158
self.status_attr: data_record.status,
154159
}
155160

161+
if data_record.in_progress_expiry_timestamp is not None:
162+
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp
163+
156164
if self.payload_validation_enabled:
157165
item[self.validation_key_attr] = data_record.payload_hash
158166

159167
now = datetime.datetime.now()
160168
try:
161169
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")
170+
171+
# | LOCKED | RETRY if status = "INPROGRESS" | RETRY
172+
# |----------------|-------------------------------------------------------|-------------> .... (time)
173+
# | Lambda Idempotency Record
174+
# | Timeout Timeout
175+
# | (in_progress_expiry) (expiry)
176+
177+
# Conditions to successfully save a record:
178+
179+
# The idempotency key does not exist:
180+
# - first time that this invocation key is used
181+
# - previous invocation with the same key was deleted due to TTL
182+
idempotency_key_not_exist = "attribute_not_exists(#id)"
183+
184+
# The idempotency record exists but it's expired:
185+
idempotency_expiry_expired = "#expiry < :now"
186+
187+
# The status of the record is "INPROGRESS", there is an in-progress expiry timestamp, but it's expired
188+
inprogress_expiry_expired = " AND ".join(
189+
[
190+
"#status = :inprogress",
191+
"attribute_exists(#in_progress_expiry)",
192+
"#in_progress_expiry < :now_in_millis",
193+
]
194+
)
195+
196+
condition_expression = (
197+
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
198+
)
199+
162200
self.table.put_item(
163201
Item=item,
164-
ConditionExpression="attribute_not_exists(#id) OR #now < :now",
165-
ExpressionAttributeNames={"#id": self.key_attr, "#now": self.expiry_attr},
166-
ExpressionAttributeValues={":now": int(now.timestamp())},
202+
ConditionExpression=condition_expression,
203+
ExpressionAttributeNames={
204+
"#id": self.key_attr,
205+
"#expiry": self.expiry_attr,
206+
"#in_progress_expiry": self.in_progress_expiry_attr,
207+
"#status": self.status_attr,
208+
},
209+
ExpressionAttributeValues={
210+
":now": int(now.timestamp()),
211+
":now_in_millis": int(now.timestamp() * 1000),
212+
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
213+
},
167214
)
168215
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
169216
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
170217
raise IdempotencyItemAlreadyExistsError
171218

172219
def _update_record(self, data_record: DataRecord):
173220
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
174-
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
221+
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
175222
expression_attr_values = {
176223
":expiry": data_record.expiry_timestamp,
177224
":response_data": data_record.response_data,
178225
":status": data_record.status,
179226
}
180227
expression_attr_names = {
181-
"#response_data": self.data_attr,
182228
"#expiry": self.expiry_attr,
229+
"#response_data": self.data_attr,
183230
"#status": self.status_attr,
184231
}
185232

Diff for: docs/media/idempotent_sequence.png

-72.9 KB
Binary file not shown.

Diff for: docs/media/idempotent_sequence_exception.png

-45.6 KB
Binary file not shown.

0 commit comments

Comments
 (0)