Skip to content

Commit f1a8832

Browse files
Tom McCarthyMichael Brewer
Tom McCarthy
and
Michael Brewer
authored
fix: correct behaviour to avoid caching "INPROGRESS" records (#295)
* fix: correct behaviour to avoid caching "INPROGRESS" records * docs: add beta flag to utility for initial release(s) * chore: Change STATUS_CONSTANTS to MappingProxyType * chore: Fix docstrings * chore: readability improvements * chore: move cache conditionals inside of cache methods * chore: add test for unhandled types Co-authored-by: Michael Brewer <[email protected]>
1 parent 03f7dcd commit f1a8832

File tree

4 files changed

+123
-65
lines changed

4 files changed

+123
-65
lines changed

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/base.py

+39-27
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import json
88
import logging
99
from abc import ABC, abstractmethod
10+
from types import MappingProxyType
1011
from typing import Any, Dict
1112

1213
import jmespath
@@ -21,7 +22,7 @@
2122

2223
logger = logging.getLogger(__name__)
2324

24-
STATUS_CONSTANTS = {"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"}
25+
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
2526

2627

2728
class DataRecord:
@@ -81,8 +82,7 @@ def status(self) -> str:
8182
"""
8283
if self.is_expired:
8384
return STATUS_CONSTANTS["EXPIRED"]
84-
85-
if self._status in STATUS_CONSTANTS.values():
85+
elif self._status in STATUS_CONSTANTS.values():
8686
return self._status
8787
else:
8888
raise IdempotencyInvalidStatusError(self._status)
@@ -214,14 +214,14 @@ def _validate_payload(self, lambda_event: Dict[str, Any], data_record: DataRecor
214214
DataRecord instance
215215
216216
Raises
217-
______
217+
----------
218218
IdempotencyValidationError
219219
Event payload doesn't match the stored record for the given idempotency key
220220
221221
"""
222222
if self.payload_validation_enabled:
223223
lambda_payload_hash = self._get_hashed_payload(lambda_event)
224-
if not data_record.payload_hash == lambda_payload_hash:
224+
if data_record.payload_hash != lambda_payload_hash:
225225
raise IdempotencyValidationError("Payload does not match stored record for this event key")
226226

227227
def _get_expiry_timestamp(self) -> int:
@@ -238,9 +238,30 @@ def _get_expiry_timestamp(self) -> int:
238238
return int((now + period).timestamp())
239239

240240
def _save_to_cache(self, data_record: DataRecord):
241+
"""
242+
Save data_record to local cache except when status is "INPROGRESS"
243+
244+
NOTE: We can't cache "INPROGRESS" records as we have no way to reflect updates that can happen outside of the
245+
execution environment
246+
247+
Parameters
248+
----------
249+
data_record: DataRecord
250+
DataRecord instance
251+
252+
Returns
253+
-------
254+
255+
"""
256+
if not self.use_local_cache:
257+
return
258+
if data_record.status == STATUS_CONSTANTS["INPROGRESS"]:
259+
return
241260
self._cache[data_record.idempotency_key] = data_record
242261

243262
def _retrieve_from_cache(self, idempotency_key: str):
263+
if not self.use_local_cache:
264+
return
244265
cached_record = self._cache.get(idempotency_key)
245266
if cached_record:
246267
if not cached_record.is_expired:
@@ -249,11 +270,13 @@ def _retrieve_from_cache(self, idempotency_key: str):
249270
self._delete_from_cache(idempotency_key)
250271

251272
def _delete_from_cache(self, idempotency_key: str):
273+
if not self.use_local_cache:
274+
return
252275
del self._cache[idempotency_key]
253276

254277
def save_success(self, event: Dict[str, Any], result: dict) -> None:
255278
"""
256-
Save record of function's execution completing succesfully
279+
Save record of function's execution completing successfully
257280
258281
Parameters
259282
----------
@@ -277,8 +300,7 @@ def save_success(self, event: Dict[str, Any], result: dict) -> None:
277300
)
278301
self._update_record(data_record=data_record)
279302

280-
if self.use_local_cache:
281-
self._save_to_cache(data_record)
303+
self._save_to_cache(data_record)
282304

283305
def save_inprogress(self, event: Dict[str, Any]) -> None:
284306
"""
@@ -298,18 +320,11 @@ def save_inprogress(self, event: Dict[str, Any]) -> None:
298320

299321
logger.debug(f"Saving in progress record for idempotency key: {data_record.idempotency_key}")
300322

301-
if self.use_local_cache:
302-
cached_record = self._retrieve_from_cache(idempotency_key=data_record.idempotency_key)
303-
if cached_record:
304-
raise IdempotencyItemAlreadyExistsError
323+
if self._retrieve_from_cache(idempotency_key=data_record.idempotency_key):
324+
raise IdempotencyItemAlreadyExistsError
305325

306326
self._put_record(data_record)
307327

308-
# This has to come after _put_record. If _put_record call raises ItemAlreadyExists we shouldn't populate the
309-
# cache with an "INPROGRESS" record as we don't know the status in the data store at this point.
310-
if self.use_local_cache:
311-
self._save_to_cache(data_record)
312-
313328
def delete_record(self, event: Dict[str, Any], exception: Exception):
314329
"""
315330
Delete record from the persistence store
@@ -329,8 +344,7 @@ def delete_record(self, event: Dict[str, Any], exception: Exception):
329344
)
330345
self._delete_record(data_record)
331346

332-
if self.use_local_cache:
333-
self._delete_from_cache(data_record.idempotency_key)
347+
self._delete_from_cache(data_record.idempotency_key)
334348

335349
def get_record(self, event: Dict[str, Any]) -> DataRecord:
336350
"""
@@ -356,17 +370,15 @@ def get_record(self, event: Dict[str, Any]) -> DataRecord:
356370

357371
idempotency_key = self._get_hashed_idempotency_key(event)
358372

359-
if self.use_local_cache:
360-
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
361-
if cached_record:
362-
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
363-
self._validate_payload(event, cached_record)
364-
return cached_record
373+
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
374+
if cached_record:
375+
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
376+
self._validate_payload(event, cached_record)
377+
return cached_record
365378

366379
record = self._get_record(idempotency_key)
367380

368-
if self.use_local_cache:
369-
self._save_to_cache(data_record=record)
381+
self._save_to_cache(data_record=record)
370382

371383
self._validate_payload(event, record)
372384
return record

Diff for: docs/utilities/idempotency.md

+30-27
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ title: Idempotency
33
description: Utility
44
---
55

6-
This utility provides a simple solution to convert your Lambda functions into idempotent operations which are safe to
7-
retry.
6+
!!! attention
7+
**This utility is currently in beta**. Please open an [issue in GitHub](https://github.com/awslabs/aws-lambda-powertools-python/issues/new/choose) for any bugs or feature requests.
8+
9+
The idempotency utility provides a simple solution to convert your Lambda functions into idempotent operations which
10+
are safe to retry.
811

912
## Terminology
1013

@@ -31,31 +34,31 @@ storage layer, so you'll need to create a table first.
3134
> Example using AWS Serverless Application Model (SAM)
3235
3336
=== "template.yml"
34-
```yaml
35-
Resources:
36-
HelloWorldFunction:
37-
Type: AWS::Serverless::Function
38-
Properties:
39-
Runtime: python3.8
40-
...
41-
Policies:
42-
- DynamoDBCrudPolicy:
43-
TableName: !Ref IdempotencyTable
44-
IdempotencyTable:
45-
Type: AWS::DynamoDB::Table
46-
Properties:
47-
AttributeDefinitions:
48-
- AttributeName: id
49-
AttributeType: S
50-
BillingMode: PAY_PER_REQUEST
51-
KeySchema:
52-
- AttributeName: id
53-
KeyType: HASH
54-
TableName: "IdempotencyTable"
55-
TimeToLiveSpecification:
56-
AttributeName: expiration
57-
Enabled: true
58-
```
37+
```yaml
38+
Resources:
39+
HelloWorldFunction:
40+
Type: AWS::Serverless::Function
41+
Properties:
42+
Runtime: python3.8
43+
...
44+
Policies:
45+
- DynamoDBCrudPolicy:
46+
TableName: !Ref IdempotencyTable
47+
IdempotencyTable:
48+
Type: AWS::DynamoDB::Table
49+
Properties:
50+
AttributeDefinitions:
51+
- AttributeName: id
52+
AttributeType: S
53+
BillingMode: PAY_PER_REQUEST
54+
KeySchema:
55+
- AttributeName: id
56+
KeyType: HASH
57+
TableName: "IdempotencyTable"
58+
TimeToLiveSpecification:
59+
AttributeName: expiration
60+
Enabled: true
61+
```
5962

6063
!!! warning
6164
When using this utility with DynamoDB, your lambda responses must always be smaller than 400kb. Larger items cannot

Diff for: tests/functional/idempotency/test_idempotency.py

+44-11
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,12 @@ def test_idempotent_lambda_in_progress_with_cache(
133133

134134
stubber.add_client_error("put_item", "ConditionalCheckFailedException")
135135
stubber.add_response("get_item", ddb_response, expected_params)
136+
137+
stubber.add_client_error("put_item", "ConditionalCheckFailedException")
138+
stubber.add_response("get_item", copy.deepcopy(ddb_response), copy.deepcopy(expected_params))
139+
140+
stubber.add_client_error("put_item", "ConditionalCheckFailedException")
141+
stubber.add_response("get_item", copy.deepcopy(ddb_response), copy.deepcopy(expected_params))
136142
stubber.activate()
137143

138144
@idempotent(persistence_store=persistence_store)
@@ -151,11 +157,8 @@ def lambda_handler(event, context):
151157
assert retrieve_from_cache_spy.call_count == 2 * loops
152158
retrieve_from_cache_spy.assert_called_with(idempotency_key=hashed_idempotency_key)
153159

154-
assert save_to_cache_spy.call_count == 1
155-
first_call_args_data_record = save_to_cache_spy.call_args_list[0].kwargs["data_record"]
156-
assert first_call_args_data_record.idempotency_key == hashed_idempotency_key
157-
assert first_call_args_data_record.status == "INPROGRESS"
158-
assert persistence_store._cache.get(hashed_idempotency_key)
160+
save_to_cache_spy.assert_called()
161+
assert persistence_store._cache.get(hashed_idempotency_key) is None
159162

160163
stubber.assert_no_pending_responses()
161164
stubber.deactivate()
@@ -223,12 +226,10 @@ def lambda_handler(event, context):
223226

224227
lambda_handler(lambda_apigw_event, {})
225228

226-
assert retrieve_from_cache_spy.call_count == 1
227-
assert save_to_cache_spy.call_count == 2
228-
first_call_args, second_call_args = save_to_cache_spy.call_args_list
229-
assert first_call_args.args[0].status == "INPROGRESS"
230-
assert second_call_args.args[0].status == "COMPLETED"
231-
assert persistence_store._cache.get(hashed_idempotency_key)
229+
retrieve_from_cache_spy.assert_called_once()
230+
save_to_cache_spy.assert_called_once()
231+
assert save_to_cache_spy.call_args[0][0].status == "COMPLETED"
232+
assert persistence_store._cache.get(hashed_idempotency_key).status == "COMPLETED"
232233

233234
# This lambda call should not call AWS API
234235
lambda_handler(lambda_apigw_event, {})
@@ -594,3 +595,35 @@ def test_data_record_invalid_status_value():
594595
_ = data_record.status
595596

596597
assert e.value.args[0] == "UNSUPPORTED_STATUS"
598+
599+
600+
@pytest.mark.parametrize("persistence_store", [{"use_local_cache": True}], indirect=True)
601+
def test_in_progress_never_saved_to_cache(persistence_store):
602+
# GIVEN a data record with status "INPROGRESS"
603+
# and persistence_store has use_local_cache = True
604+
data_record = DataRecord("key", status="INPROGRESS")
605+
606+
# WHEN saving to local cache
607+
persistence_store._save_to_cache(data_record)
608+
609+
# THEN don't save to local cache
610+
assert persistence_store._cache.get("key") is None
611+
612+
613+
@pytest.mark.parametrize("persistence_store", [{"use_local_cache": False}], indirect=True)
614+
def test_user_local_disabled(persistence_store):
615+
# GIVEN a persistence_store with use_local_cache = False
616+
617+
# WHEN calling any local cache options
618+
data_record = DataRecord("key", status="COMPLETED")
619+
try:
620+
persistence_store._save_to_cache(data_record)
621+
cache_value = persistence_store._retrieve_from_cache("key")
622+
assert cache_value is None
623+
persistence_store._delete_from_cache("key")
624+
except AttributeError as e:
625+
pytest.fail(f"AttributeError should not be raised: {e}")
626+
627+
# THEN raise AttributeError
628+
# AND don't have a _cache attribute
629+
assert not hasattr("persistence_store", "_cache")

Diff for: tests/unit/test_json_encoder.py

+10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import decimal
22
import json
33

4+
import pytest
5+
46
from aws_lambda_powertools.shared.json_encoder import Encoder
57

68

@@ -12,3 +14,11 @@ def test_jsonencode_decimal():
1214
def test_jsonencode_decimal_nan():
1315
result = json.dumps({"val": decimal.Decimal("NaN")}, cls=Encoder)
1416
assert result == '{"val": NaN}'
17+
18+
19+
def test_jsonencode_calls_default():
20+
class CustomClass:
21+
pass
22+
23+
with pytest.raises(TypeError):
24+
json.dumps({"val": CustomClass()}, cls=Encoder)

0 commit comments

Comments
 (0)