Skip to content

Commit 631370d

Browse files
committed
chore(idempotency): remove param expires_in_progress
1 parent cd90fc1 commit 631370d

File tree

8 files changed

+172
-267
lines changed

8 files changed

+172
-267
lines changed

aws_lambda_powertools/utilities/idempotency/base.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,12 +190,8 @@ def _handle_for_status(self, data_record: DataRecord) -> Optional[Dict[Any, Any]
190190
raise IdempotencyInconsistentStateError("save_inprogress and get_record return inconsistent results.")
191191

192192
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
193-
# This code path will only be triggered if the expires_in_progress option is enabled, and the item
194-
# became expored between the save_inprogress call an dhere
195-
if (
196-
self.config.expires_in_progress
197-
and data_record.in_progress_expiry_timestamp is not None
198-
and data_record.in_progress_expiry_timestamp < int(datetime.datetime.now().timestamp())
193+
if data_record.in_progress_expiry_timestamp is not None and data_record.in_progress_expiry_timestamp < int(
194+
datetime.datetime.now().timestamp()
199195
):
200196
raise IdempotencyInconsistentStateError(
201197
"item should have been expired in-progress because it already time-outed."

aws_lambda_powertools/utilities/idempotency/config.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ def __init__(
99
jmespath_options: Optional[Dict] = None,
1010
raise_on_no_idempotency_key: bool = False,
1111
expires_after_seconds: int = 60 * 60, # 1 hour default
12-
expires_in_progress: bool = False,
1312
use_local_cache: bool = False,
1413
local_cache_max_items: int = 256,
1514
hash_function: str = "md5",
@@ -27,8 +26,6 @@ def __init__(
2726
Raise exception if no idempotency key was found in the request, by default False
2827
expires_after_seconds: int
2928
The number of seconds to wait before a record is expired
30-
expires_in_progress: bool, optional
31-
Whether to expire units of work that timed-out during invocation, by default False
3229
use_local_cache: bool, optional
3330
Whether to locally cache idempotency results, by default False
3431
local_cache_max_items: int, optional
@@ -41,7 +38,6 @@ def __init__(
4138
self.jmespath_options = jmespath_options
4239
self.raise_on_no_idempotency_key = raise_on_no_idempotency_key
4340
self.expires_after_seconds = expires_after_seconds
44-
self.expires_in_progress = expires_in_progress
4541
self.use_local_cache = use_local_cache
4642
self.local_cache_max_items = local_cache_max_items
4743
self.hash_function = hash_function

aws_lambda_powertools/utilities/idempotency/persistence/base.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ def __init__(self):
125125
self.validation_key_jmespath = None
126126
self.raise_on_no_idempotency_key = False
127127
self.expires_after_seconds: int = 60 * 60 # 1 hour default
128-
self.expires_in_progress: bool = False
129128
self.use_local_cache = False
130129
self.hash_function = None
131130

@@ -158,7 +157,6 @@ def configure(self, config: IdempotencyConfig, function_name: Optional[str] = No
158157
self.payload_validation_enabled = True
159158
self.raise_on_no_idempotency_key = config.raise_on_no_idempotency_key
160159
self.expires_after_seconds = config.expires_after_seconds
161-
self.expires_in_progress = config.expires_in_progress
162160
self.use_local_cache = config.use_local_cache
163161
if self.use_local_cache:
164162
self._cache = LRUDict(max_items=config.local_cache_max_items)
@@ -353,19 +351,18 @@ def save_inprogress(self, data: Dict[str, Any], remaining_time_in_millis: Option
353351
payload_hash=self._get_hashed_payload(data=data),
354352
)
355353

356-
if self.expires_in_progress:
357-
if remaining_time_in_millis:
358-
now = datetime.datetime.now()
359-
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
354+
if remaining_time_in_millis:
355+
now = datetime.datetime.now()
356+
period = datetime.timedelta(milliseconds=remaining_time_in_millis)
360357

361-
# It's very important to use math.ceil here. Otherwise, we might return an integer that will be smaller
362-
# than the current time in milliseconds, due to rounding. This will create a scenario where the record
363-
# looks already expired in the store, but the invocation is still running.
364-
timestamp = ceil((now + period).timestamp())
358+
# It's very important to use math.ceil here. Otherwise, we might return an integer that will be smaller
359+
# than the current time in milliseconds, due to rounding. This will create a scenario where the record
360+
# looks already expired in the store, but the invocation is still running.
361+
timestamp = ceil((now + period).timestamp())
365362

366-
data_record.in_progress_expiry_timestamp = timestamp
367-
else:
368-
warnings.warn("Expires in progress is enabled but we couldn't determine the remaining time left")
363+
data_record.in_progress_expiry_timestamp = timestamp
364+
else:
365+
warnings.warn("Expires in progress is enabled but we couldn't determine the remaining time left")
369366

370367
logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
371368

aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -168,64 +168,48 @@ def _put_record(self, data_record: DataRecord) -> None:
168168
try:
169169
logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}")
170170

171-
condition_expression = "attribute_not_exists(#id) OR #now < :now"
172-
expression_attribute_names = {
173-
"#id": self.key_attr,
174-
"#now": self.expiry_attr,
175-
"#status": self.status_attr,
176-
}
177-
expression_attribute_values: Dict[str, Any] = {":now": int(now.timestamp())}
178-
179-
# When we want to expire_in_progress invocations, we check if the in_progress timestamp exists
180-
# and we are past that timestamp. We also make sure the status is INPROGRESS because we don't
181-
# want to repeat COMPLETE invocations.
182-
#
183-
# We put this in an if block because customers might want to disable the feature,
184-
# reverting to the old behavior that relies just on the idempotency key.
185-
if self.expires_in_progress:
186-
condition_expression += (
187-
" OR ("
188-
"attribute_exists(#in_progress_expiry) AND "
189-
"#in_progress_expiry < :now AND #status = :inprogress"
190-
")"
191-
)
192-
expression_attribute_names["#in_progress_expiry"] = self.in_progress_expiry_attr
193-
expression_attribute_values[":inprogress"] = STATUS_CONSTANTS["INPROGRESS"]
194-
195171
self.table.put_item(
196172
Item=item,
197-
ConditionExpression=condition_expression,
198-
ExpressionAttributeNames=expression_attribute_names,
199-
ExpressionAttributeValues=expression_attribute_values,
173+
ConditionExpression=(
174+
"attribute_not_exists(#id) OR #now < :now OR "
175+
"(attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now AND #status = :inprogress)"
176+
),
177+
ExpressionAttributeNames={
178+
"#id": self.key_attr,
179+
"#now": self.expiry_attr,
180+
"#in_progress_expiry": self.in_progress_expiry_attr,
181+
"#status": self.status_attr,
182+
},
183+
ExpressionAttributeValues={":now": int(now.timestamp()), ":inprogress": STATUS_CONSTANTS["INPROGRESS"]},
200184
)
201185
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
202186
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
203187
raise IdempotencyItemAlreadyExistsError
204188

205189
def _update_record(self, data_record: DataRecord):
206190
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
207-
update_expression = "SET #response_data = :response_data, #expiry = :expiry, #status = :status"
191+
update_expression = (
192+
"SET #response_data = :response_data, #expiry = :expiry, "
193+
"#status = :status, #in_progress_expiry = :in_progress_expiry"
194+
)
208195
expression_attr_values = {
209196
":expiry": data_record.expiry_timestamp,
210197
":response_data": data_record.response_data,
211198
":status": data_record.status,
199+
":in_progress_expiry": data_record.in_progress_expiry_timestamp,
212200
}
213201
expression_attr_names = {
214202
"#response_data": self.data_attr,
215203
"#expiry": self.expiry_attr,
216204
"#status": self.status_attr,
205+
"#in_progress_expiry": self.in_progress_expiry_attr,
217206
}
218207

219208
if self.payload_validation_enabled:
220209
update_expression += ", #validation_key = :validation_key"
221210
expression_attr_values[":validation_key"] = data_record.payload_hash
222211
expression_attr_names["#validation_key"] = self.validation_key_attr
223212

224-
if self.expires_in_progress:
225-
update_expression += ", #in_progress_expiry = :in_progress_expiry"
226-
expression_attr_values[":in_progress_expiry"] = data_record.in_progress_expiry_timestamp
227-
expression_attr_names["#in_progress_expiry"] = self.in_progress_expiry_attr
228-
229213
kwargs = {
230214
"Key": self._get_key(data_record.idempotency_key),
231215
"UpdateExpression": update_expression,

docs/utilities/idempotency.md

Lines changed: 55 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -370,12 +370,12 @@ sequenceDiagram
370370
Lambda->>Persistence Layer: Update record with Lambda handler results
371371
deactivate Persistence Layer
372372
Persistence Layer-->>Persistence Layer: Update record with result
373-
Lambda--xClient: Response not received by client
373+
Lambda-->>Client: Response sent to client
374374
else retried request:
375375
Client->>Lambda: Invoke (event)
376376
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
377377
Persistence Layer-->>Lambda: Already exists in persistence layer. Return result
378-
Lambda-->>Client: Response sent to client
378+
Lambda-->>Client: Response sent to client
379379
end
380380
```
381381
<i>Idempotent sequence</i>
@@ -386,6 +386,59 @@ The client was successful in receiving the result after the retry. Since the Lam
386386
???+ note
387387
Bear in mind that the entire Lambda handler is treated as a single idempotent operation. If your Lambda handler can cause multiple side effects, consider splitting it into separate functions.
388388

389+
#### Lambda timeouts
390+
391+
In cases where the [Lambda invocation
392+
expires](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/),
393+
Powertools doesn't have the chance to set the idempotency record to `EXPIRED`.
394+
This means that the record would normally have been locked until `expire_seconds` have
395+
passsed.
396+
397+
However, when Powertools has access to the Lambda invocation context, we are able to calculate the remaining
398+
available time for the invocation, and save it on the idempotency record. This way, if a second invocation happens
399+
after this timestamp, and the record is still marked `INPROGRESS`, we execute the inovcation again as if it was
400+
already expired. This means that if an invocation expired during execution, it will be quickly executed again on the
401+
next retry.
402+
403+
???+ info "Info: Calculating the remaining available time"
404+
For now this only works with the `idempotent` decorator. At the moment we
405+
don't have access to the Lambda context when using the
406+
`idempotent_function` so enabling this option is a no-op in that scenario.
407+
408+
<center>
409+
```mermaid
410+
sequenceDiagram
411+
participant Client
412+
participant Lambda
413+
participant Persistence Layer
414+
alt initial request:
415+
Client->>Lambda: Invoke (event)
416+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
417+
activate Persistence Layer
418+
Note right of Persistence Layer: Locked during this time. Prevents multiple<br/>Lambda invocations with the same<br/>payload running concurrently.
419+
Lambda--xLambda: Run Lambda handler (event).<br/>Time out
420+
Lambda-->>Client: Return error response
421+
deactivate Persistence Layer
422+
else retried before the Lambda timeout:
423+
Client->>Lambda: Invoke (event)
424+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
425+
Persistence Layer-->>Lambda: Already exists in persistence layer. Still in-progress
426+
Lambda--xClient: Return Invocation Already In Progress error
427+
else retried after the Lambda timeout:
428+
Client->>Lambda: Invoke (event)
429+
Lambda->>Persistence Layer: Get or set (id=event.search(payload))
430+
activate Persistence Layer
431+
Note right of Persistence Layer: Locked during this time. Prevents multiple<br/>Lambda invocations with the same<br/>payload running concurrently.
432+
Lambda-->>Lambda: Run Lambda handler (event)
433+
Lambda->>Persistence Layer: Update record with Lambda handler results
434+
deactivate Persistence Layer
435+
Persistence Layer-->>Persistence Layer: Update record with result
436+
Lambda-->>Client: Response sent to client
437+
end
438+
```
439+
<i>Idempotent sequence for Lambda timeouts</i>
440+
</center>
441+
389442
### Handling exceptions
390443

391444
If you are using the `idempotent` decorator on your Lambda handler, any unhandled exceptions that are raised during the code execution will cause **the record in the persistence layer to be deleted**.
@@ -483,7 +536,6 @@ Parameter | Default | Description
483536
**payload_validation_jmespath** | `""` | JMESPath expression to validate whether certain parameters have changed in the event while the event payload
484537
**raise_on_no_idempotency_key** | `False` | Raise exception if no idempotency key was found in the request
485538
**expires_after_seconds** | 3600 | The number of seconds to wait before a record is expired
486-
**expires_in_progress** | `False`| Enables expiry of invocations that time out during execution
487539
**use_local_cache** | `False` | Whether to locally cache idempotency results
488540
**local_cache_max_items** | 256 | Max number of items to store in local cache
489541
**hash_function** | `md5` | Function to use for calculating hashes, as provided by [hashlib](https://docs.python.org/3/library/hashlib.html) in the standard library.
@@ -897,58 +949,6 @@ class DynamoDBPersistenceLayer(BasePersistenceLayer):
897949

898950
For example, the `_put_record` method needs to raise an exception if a non-expired record already exists in the data store with a matching key.
899951

900-
### Expiring in-progress invocations
901-
902-
The default behavior when a Lambda invocation times out is for the event to stay locked until `expire_seconds` have passed. Powertools
903-
has no way of knowing if it's safe to retry the operation in this scenario, so we assume the safest approach: to not
904-
retry the operation.
905-
906-
However, certain types of invocation have less strict requirements, and can benefit from faster expiry of invocations. Ideally, we
907-
can make an in-progress invocation expire as soon as the [Lambda invocation expires](https://aws.amazon.com/premiumsupport/knowledge-center/lambda-verify-invocation-timeouts/).
908-
909-
When using this option, powertools will calculate the remaining available time for the invocation, and save it on the idempotency record.
910-
This way, if a second invocation happens after this timestamp, and the record is stil marked `INPROGRESS`, we execute the invocation again
911-
as if it was already expired. This means that if an invocation expired during execution, it will be quickly executed again on the next try.
912-
913-
This setting introduces no change on the regular behavior where if an invocation succeeds, the results are cached for `expire_seconds` seconds.
914-
915-
???+ warning "Warning"
916-
Consider whenever you really want this behavior. Powertools can't make any garantee on which state your application was
917-
when it time outed. Ensure that your business logic can be retried at any stage.
918-
919-
???+ info "Info: Calculating the remaining available time"
920-
For now this only works with the `idempotent` decorator. At the moment we don't have access to the Lambda context when using
921-
the `idempotent_function` so enabling this option is a no-op in that scenario.
922-
923-
To activate this behaviour, enable the `expires_in_progress` option on the configuration:
924-
925-
=== "app.py"
926-
927-
```python hl_lines="8"
928-
from aws_lambda_powertools.utilities.idempotency import (
929-
DynamoDBPersistenceLayer, IdempotencyConfig, idempotent
930-
)
931-
932-
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
933-
934-
config = IdempotencyConfig(
935-
expires_in_progress=True,
936-
)
937-
938-
@idempotent(persistence_store=persistence_layer, config=config)
939-
def handler(event, context):
940-
payment = create_subscription_payment(
941-
user=event['user'],
942-
product=event['product_id']
943-
)
944-
...
945-
return {
946-
"payment_id": payment.id,
947-
"message": "success",
948-
"statusCode": 200,
949-
}
950-
```
951-
952952
## Compatibility with other utilities
953953

954954
### Validation utility

0 commit comments

Comments
 (0)