Skip to content

Commit f833233

Browse files
dastraDan StrawleandrodamascenaCavalcante Damascena
authored
feat(idempotency): leverage new DynamoDB Failed conditional writes behavior with ReturnValuesOnConditionCheckFailure (#3446)
* feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327 * feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327 * feat: add ReturnValuesOnConditionCheckFailure to DynamoDB idempotency to return a copy of the item on failure and avoid a subsequent get #3327. Changes after PR comments * Improving code readability * Reverting function * Adding comments about some logic decisions * Use DynamoDBPersistenceLayer passed in for test_idempotent_lambda_expired_during_request Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: Dan Straw <[email protected]> * Adding docs * wording * Adressing Ruben's feedback * Adressing Ruben's feedback --------- Signed-off-by: Dan Straw <[email protected]> Co-authored-by: Dan Straw <[email protected]> Co-authored-by: Leandro Damascena <[email protected]> Co-authored-by: Cavalcante Damascena <[email protected]>
1 parent 79e248c commit f833233

File tree

12 files changed

+295
-153
lines changed

12 files changed

+295
-153
lines changed

aws_lambda_powertools/utilities/idempotency/base.py

+13-6
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
IdempotencyValidationError,
1515
)
1616
from aws_lambda_powertools.utilities.idempotency.persistence.base import (
17-
STATUS_CONSTANTS,
1817
BasePersistenceLayer,
18+
)
19+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
20+
STATUS_CONSTANTS,
1921
DataRecord,
2022
)
2123
from aws_lambda_powertools.utilities.idempotency.serialization.base import (
@@ -118,12 +120,17 @@ def _process_idempotency(self):
118120
data=self.data,
119121
remaining_time_in_millis=self._get_remaining_time_in_millis(),
120122
)
121-
except IdempotencyKeyError:
123+
except (IdempotencyKeyError, IdempotencyValidationError):
122124
raise
123-
except IdempotencyItemAlreadyExistsError:
124-
# Now we know the item already exists, we can retrieve it
125-
record = self._get_idempotency_record()
126-
if record is not None:
125+
except IdempotencyItemAlreadyExistsError as exc:
126+
# Attempt to retrieve the existing record, either from the exception ReturnValuesOnConditionCheckFailure
127+
# or perform a GET operation if the information is not available.
128+
# We give preference to ReturnValuesOnConditionCheckFailure because it is a faster and more cost-effective
129+
# way of retrieving the existing record after a failed conditional write operation.
130+
record = exc.old_data_record or self._get_idempotency_record()
131+
132+
# If a record is found, handle it for status
133+
if record:
127134
return self._handle_for_status(record)
128135
except Exception as exc:
129136
raise IdempotencyPersistenceLayerError(

aws_lambda_powertools/utilities/idempotency/exceptions.py

+14
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from typing import Optional, Union
77

8+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import DataRecord
9+
810

911
class BaseError(Exception):
1012
"""
@@ -30,6 +32,18 @@ class IdempotencyItemAlreadyExistsError(BaseError):
3032
Item attempting to be inserted into persistence store already exists and is not expired
3133
"""
3234

35+
def __init__(self, *args: Optional[Union[str, Exception]], old_data_record: Optional[DataRecord] = None):
36+
self.old_data_record = old_data_record
37+
super().__init__(*args)
38+
39+
def __str__(self):
40+
"""
41+
Return all arguments formatted or original message
42+
"""
43+
old_data_record = f" from [{(str(self.old_data_record))}]" if self.old_data_record else ""
44+
message = super().__str__()
45+
return f"{message}{old_data_record}"
46+
3347

3448
class IdempotencyItemNotFoundError(BaseError):
3549
"""

aws_lambda_powertools/utilities/idempotency/persistence/base.py

+21-91
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
import os
99
import warnings
1010
from abc import ABC, abstractmethod
11-
from types import MappingProxyType
12-
from typing import Any, Dict, Optional
11+
from typing import Any, Dict, Optional, Union
1312

1413
import jmespath
1514

@@ -18,95 +17,18 @@
1817
from aws_lambda_powertools.shared.json_encoder import Encoder
1918
from aws_lambda_powertools.utilities.idempotency.config import IdempotencyConfig
2019
from aws_lambda_powertools.utilities.idempotency.exceptions import (
21-
IdempotencyInvalidStatusError,
2220
IdempotencyItemAlreadyExistsError,
2321
IdempotencyKeyError,
2422
IdempotencyValidationError,
2523
)
24+
from aws_lambda_powertools.utilities.idempotency.persistence.datarecord import (
25+
STATUS_CONSTANTS,
26+
DataRecord,
27+
)
2628
from aws_lambda_powertools.utilities.jmespath_utils import PowertoolsFunctions
2729

2830
logger = logging.getLogger(__name__)
2931

30-
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
31-
32-
33-
class DataRecord:
34-
"""
35-
Data Class for idempotency records.
36-
"""
37-
38-
def __init__(
39-
self,
40-
idempotency_key: str,
41-
status: str = "",
42-
expiry_timestamp: Optional[int] = None,
43-
in_progress_expiry_timestamp: Optional[int] = None,
44-
response_data: str = "",
45-
payload_hash: str = "",
46-
) -> None:
47-
"""
48-
49-
Parameters
50-
----------
51-
idempotency_key: str
52-
hashed representation of the idempotent data
53-
status: str, optional
54-
status of the idempotent record
55-
expiry_timestamp: int, optional
56-
time before the record should expire, in seconds
57-
in_progress_expiry_timestamp: int, optional
58-
time before the record should expire while in the INPROGRESS state, in seconds
59-
payload_hash: str, optional
60-
hashed representation of payload
61-
response_data: str, optional
62-
response data from previous executions using the record
63-
"""
64-
self.idempotency_key = idempotency_key
65-
self.payload_hash = payload_hash
66-
self.expiry_timestamp = expiry_timestamp
67-
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
68-
self._status = status
69-
self.response_data = response_data
70-
71-
@property
72-
def is_expired(self) -> bool:
73-
"""
74-
Check if data record is expired
75-
76-
Returns
77-
-------
78-
bool
79-
Whether the record is currently expired or not
80-
"""
81-
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
82-
83-
@property
84-
def status(self) -> str:
85-
"""
86-
Get status of data record
87-
88-
Returns
89-
-------
90-
str
91-
"""
92-
if self.is_expired:
93-
return STATUS_CONSTANTS["EXPIRED"]
94-
elif self._status in STATUS_CONSTANTS.values():
95-
return self._status
96-
else:
97-
raise IdempotencyInvalidStatusError(self._status)
98-
99-
def response_json_as_dict(self) -> Optional[dict]:
100-
"""
101-
Get response data deserialized to python dict
102-
103-
Returns
104-
-------
105-
Optional[dict]
106-
previous response data deserialized
107-
"""
108-
return json.loads(self.response_data) if self.response_data else None
109-
11032

11133
class BasePersistenceLayer(ABC):
11234
"""
@@ -238,16 +160,20 @@ def _generate_hash(self, data: Any) -> str:
238160
hashed_data = self.hash_function(json.dumps(data, cls=Encoder, sort_keys=True).encode())
239161
return hashed_data.hexdigest()
240162

241-
def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> None:
163+
def _validate_payload(
164+
self,
165+
data_payload: Union[Dict[str, Any], DataRecord],
166+
stored_data_record: DataRecord,
167+
) -> None:
242168
"""
243169
Validate that the hashed payload matches data provided and stored data record
244170
245171
Parameters
246172
----------
247-
data: Dict[str, Any]
173+
data_payload: Union[Dict[str, Any], DataRecord]
248174
Payload
249-
data_record: DataRecord
250-
DataRecord instance
175+
stored_data_record: DataRecord
176+
DataRecord fetched from Dynamo or cache
251177
252178
Raises
253179
----------
@@ -256,8 +182,12 @@ def _validate_payload(self, data: Dict[str, Any], data_record: DataRecord) -> No
256182
257183
"""
258184
if self.payload_validation_enabled:
259-
data_hash = self._get_hashed_payload(data=data)
260-
if data_record.payload_hash != data_hash:
185+
if isinstance(data_payload, DataRecord):
186+
data_hash = data_payload.payload_hash
187+
else:
188+
data_hash = self._get_hashed_payload(data=data_payload)
189+
190+
if stored_data_record.payload_hash != data_hash:
261191
raise IdempotencyValidationError("Payload does not match stored record for this event key")
262192

263193
def _get_expiry_timestamp(self) -> int:
@@ -448,14 +378,14 @@ def get_record(self, data: Dict[str, Any]) -> Optional[DataRecord]:
448378
cached_record = self._retrieve_from_cache(idempotency_key=idempotency_key)
449379
if cached_record:
450380
logger.debug(f"Idempotency record found in cache with idempotency key: {idempotency_key}")
451-
self._validate_payload(data=data, data_record=cached_record)
381+
self._validate_payload(data_payload=data, stored_data_record=cached_record)
452382
return cached_record
453383

454384
record = self._get_record(idempotency_key=idempotency_key)
455385

456386
self._save_to_cache(data_record=record)
457387

458-
self._validate_payload(data=data, data_record=record)
388+
self._validate_payload(data_payload=data, stored_data_record=record)
459389
return record
460390

461391
@abstractmethod
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
"""
2+
Data Class for idempotency records.
3+
"""
4+
5+
import datetime
6+
import json
7+
import logging
8+
from types import MappingProxyType
9+
from typing import Optional
10+
11+
logger = logging.getLogger(__name__)
12+
13+
STATUS_CONSTANTS = MappingProxyType({"INPROGRESS": "INPROGRESS", "COMPLETED": "COMPLETED", "EXPIRED": "EXPIRED"})
14+
15+
16+
class DataRecord:
17+
"""
18+
Data Class for idempotency records.
19+
"""
20+
21+
def __init__(
22+
self,
23+
idempotency_key: str,
24+
status: str = "",
25+
expiry_timestamp: Optional[int] = None,
26+
in_progress_expiry_timestamp: Optional[int] = None,
27+
response_data: str = "",
28+
payload_hash: str = "",
29+
) -> None:
30+
"""
31+
32+
Parameters
33+
----------
34+
idempotency_key: str
35+
hashed representation of the idempotent data
36+
status: str, optional
37+
status of the idempotent record
38+
expiry_timestamp: int, optional
39+
time before the record should expire, in seconds
40+
in_progress_expiry_timestamp: int, optional
41+
time before the record should expire while in the INPROGRESS state, in seconds
42+
payload_hash: str, optional
43+
hashed representation of payload
44+
response_data: str, optional
45+
response data from previous executions using the record
46+
"""
47+
self.idempotency_key = idempotency_key
48+
self.payload_hash = payload_hash
49+
self.expiry_timestamp = expiry_timestamp
50+
self.in_progress_expiry_timestamp = in_progress_expiry_timestamp
51+
self._status = status
52+
self.response_data = response_data
53+
54+
@property
55+
def is_expired(self) -> bool:
56+
"""
57+
Check if data record is expired
58+
59+
Returns
60+
-------
61+
bool
62+
Whether the record is currently expired or not
63+
"""
64+
return bool(self.expiry_timestamp and int(datetime.datetime.now().timestamp()) > self.expiry_timestamp)
65+
66+
@property
67+
def status(self) -> str:
68+
"""
69+
Get status of data record
70+
71+
Returns
72+
-------
73+
str
74+
"""
75+
if self.is_expired:
76+
return STATUS_CONSTANTS["EXPIRED"]
77+
if self._status in STATUS_CONSTANTS.values():
78+
return self._status
79+
80+
from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyInvalidStatusError
81+
82+
raise IdempotencyInvalidStatusError(self._status)
83+
84+
def response_json_as_dict(self) -> Optional[dict]:
85+
"""
86+
Get response data deserialized to python dict
87+
88+
Returns
89+
-------
90+
Optional[dict]
91+
previous response data deserialized
92+
"""
93+
return json.loads(self.response_data) if self.response_data else None

0 commit comments

Comments
 (0)