Skip to content

Commit cca99ba

Browse files
mploskiRelease botMichal Ploskileandrodamascena
authored
fix(idempotency): make idempotent_function decorator thread safe (#1899)
* update changelog with latest changes * Add low level dynamodb client * Fix black and mypy warnings * Fix test error + add missing pytest groupings * Simplify parallel function handler * Rename e2e tests * fix(idempotency): updating documentation --------- Co-authored-by: Release bot <[email protected]> Co-authored-by: Michal Ploski <[email protected]> Co-authored-by: Leandro Damascena <[email protected]>
1 parent 66a39e5 commit cca99ba

File tree

18 files changed

+195
-129
lines changed

18 files changed

+195
-129
lines changed

Diff for: aws_lambda_powertools/event_handler/api_gateway.py

-1
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,6 @@ def route(
836836
# Override _compile_regex to exclude trailing slashes for route resolution
837837
@staticmethod
838838
def _compile_regex(rule: str, base_regex: str = _ROUTE_REGEX):
839-
840839
return super(APIGatewayRestResolver, APIGatewayRestResolver)._compile_regex(rule, "^{}/*$")
841840

842841

Diff for: aws_lambda_powertools/logging/utils.py

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ def copy_config_to_registered_loggers(
1212
exclude: Optional[Set[str]] = None,
1313
include: Optional[Set[str]] = None,
1414
) -> None:
15-
1615
"""Copies source Logger level and handler to all registered loggers for consistent formatting.
1716
1817
Parameters

Diff for: aws_lambda_powertools/utilities/feature_flags/schema.py

-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,6 @@ def __init__(self, rule: Dict[str, Any], rule_name: str, logger: Optional[Union[
272272
self.logger = logger or logging.getLogger(__name__)
273273

274274
def validate(self):
275-
276275
if not self.conditions or not isinstance(self.conditions, list):
277276
self.logger.debug(f"Condition is empty or invalid for rule={self.rule_name}")
278277
raise SchemaValidationError(f"Invalid condition, rule={self.rule_name}")

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DataRecord:
3737

3838
def __init__(
3939
self,
40-
idempotency_key,
40+
idempotency_key: str,
4141
status: str = "",
4242
expiry_timestamp: Optional[int] = None,
4343
in_progress_expiry_timestamp: Optional[int] = None,

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py

+46-50
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from typing import Any, Dict, Optional
55

66
import boto3
7+
from boto3.dynamodb.types import TypeDeserializer
78
from botocore.config import Config
9+
from botocore.exceptions import ClientError
810

911
from aws_lambda_powertools.shared import constants
1012
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
@@ -79,13 +81,14 @@ def __init__(
7981

8082
self._boto_config = boto_config or Config()
8183
self._boto3_session = boto3_session or boto3.session.Session()
84+
self._client = self._boto3_session.client("dynamodb", config=self._boto_config)
85+
8286
if sort_key_attr == key_attr:
8387
raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")
8488

8589
if static_pk_value is None:
8690
static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"
8791

88-
self._table = None
8992
self.table_name = table_name
9093
self.key_attr = key_attr
9194
self.static_pk_value = static_pk_value
@@ -95,31 +98,15 @@ def __init__(
9598
self.status_attr = status_attr
9699
self.data_attr = data_attr
97100
self.validation_key_attr = validation_key_attr
98-
super(DynamoDBPersistenceLayer, self).__init__()
99101

100-
@property
101-
def table(self):
102-
"""
103-
Caching property to store boto3 dynamodb Table resource
102+
self._deserializer = TypeDeserializer()
104103

105-
"""
106-
if self._table:
107-
return self._table
108-
ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
109-
self._table = ddb_resource.Table(self.table_name)
110-
return self._table
111-
112-
@table.setter
113-
def table(self, table):
114-
"""
115-
Allow table instance variable to be set directly, primarily for use in tests
116-
"""
117-
self._table = table
104+
super(DynamoDBPersistenceLayer, self).__init__()
118105

119106
def _get_key(self, idempotency_key: str) -> dict:
120107
if self.sort_key_attr:
121-
return {self.key_attr: self.static_pk_value, self.sort_key_attr: idempotency_key}
122-
return {self.key_attr: idempotency_key}
108+
return {self.key_attr: {"S": self.static_pk_value}, self.sort_key_attr: {"S": idempotency_key}}
109+
return {self.key_attr: {"S": idempotency_key}}
123110

124111
def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
125112
"""
@@ -136,36 +123,39 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
136123
representation of item
137124
138125
"""
126+
data = self._deserializer.deserialize({"M": item})
139127
return DataRecord(
140-
idempotency_key=item[self.key_attr],
141-
status=item[self.status_attr],
142-
expiry_timestamp=item[self.expiry_attr],
143-
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
144-
response_data=item.get(self.data_attr),
145-
payload_hash=item.get(self.validation_key_attr),
128+
idempotency_key=data[self.key_attr],
129+
status=data[self.status_attr],
130+
expiry_timestamp=data[self.expiry_attr],
131+
in_progress_expiry_timestamp=data.get(self.in_progress_expiry_attr),
132+
response_data=data.get(self.data_attr),
133+
payload_hash=data.get(self.validation_key_attr),
146134
)
147135

148136
def _get_record(self, idempotency_key) -> DataRecord:
149-
response = self.table.get_item(Key=self._get_key(idempotency_key), ConsistentRead=True)
150-
137+
response = self._client.get_item(
138+
TableName=self.table_name, Key=self._get_key(idempotency_key), ConsistentRead=True
139+
)
151140
try:
152141
item = response["Item"]
153-
except KeyError:
154-
raise IdempotencyItemNotFoundError
142+
except KeyError as exc:
143+
raise IdempotencyItemNotFoundError from exc
155144
return self._item_to_data_record(item)
156145

157146
def _put_record(self, data_record: DataRecord) -> None:
158147
item = {
159148
**self._get_key(data_record.idempotency_key),
160-
self.expiry_attr: data_record.expiry_timestamp,
161-
self.status_attr: data_record.status,
149+
self.key_attr: {"S": data_record.idempotency_key},
150+
self.expiry_attr: {"N": str(data_record.expiry_timestamp)},
151+
self.status_attr: {"S": data_record.status},
162152
}
163153

164154
if data_record.in_progress_expiry_timestamp is not None:
165-
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp
155+
item[self.in_progress_expiry_attr] = {"N": str(data_record.in_progress_expiry_timestamp)}
166156

167-
if self.payload_validation_enabled:
168-
item[self.validation_key_attr] = data_record.payload_hash
157+
if self.payload_validation_enabled and data_record.payload_hash:
158+
item[self.validation_key_attr] = {"S": data_record.payload_hash}
169159

170160
now = datetime.datetime.now()
171161
try:
@@ -199,8 +189,8 @@ def _put_record(self, data_record: DataRecord) -> None:
199189
condition_expression = (
200190
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
201191
)
202-
203-
self.table.put_item(
192+
self._client.put_item(
193+
TableName=self.table_name,
204194
Item=item,
205195
ConditionExpression=condition_expression,
206196
ExpressionAttributeNames={
@@ -210,22 +200,28 @@ def _put_record(self, data_record: DataRecord) -> None:
210200
"#status": self.status_attr,
211201
},
212202
ExpressionAttributeValues={
213-
":now": int(now.timestamp()),
214-
":now_in_millis": int(now.timestamp() * 1000),
215-
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
203+
":now": {"N": str(int(now.timestamp()))},
204+
":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
205+
":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
216206
},
217207
)
218-
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
219-
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
220-
raise IdempotencyItemAlreadyExistsError
208+
except ClientError as exc:
209+
error_code = exc.response.get("Error", {}).get("Code")
210+
if error_code == "ConditionalCheckFailedException":
211+
logger.debug(
212+
f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}"
213+
)
214+
raise IdempotencyItemAlreadyExistsError from exc
215+
else:
216+
raise
221217

222218
def _update_record(self, data_record: DataRecord):
223219
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
224220
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
225221
expression_attr_values = {
226-
":expiry": data_record.expiry_timestamp,
227-
":response_data": data_record.response_data,
228-
":status": data_record.status,
222+
":expiry": {"N": str(data_record.expiry_timestamp)},
223+
":response_data": {"S": data_record.response_data},
224+
":status": {"S": data_record.status},
229225
}
230226
expression_attr_names = {
231227
"#expiry": self.expiry_attr,
@@ -235,7 +231,7 @@ def _update_record(self, data_record: DataRecord):
235231

236232
if self.payload_validation_enabled:
237233
update_expression += ", #validation_key = :validation_key"
238-
expression_attr_values[":validation_key"] = data_record.payload_hash
234+
expression_attr_values[":validation_key"] = {"S": data_record.payload_hash}
239235
expression_attr_names["#validation_key"] = self.validation_key_attr
240236

241237
kwargs = {
@@ -245,8 +241,8 @@ def _update_record(self, data_record: DataRecord):
245241
"ExpressionAttributeNames": expression_attr_names,
246242
}
247243

248-
self.table.update_item(**kwargs)
244+
self._client.update_item(TableName=self.table_name, **kwargs)
249245

250246
def _delete_record(self, data_record: DataRecord) -> None:
251247
logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
252-
self.table.delete_item(Key=self._get_key(data_record.idempotency_key))
248+
self._client.delete_item(TableName=self.table_name, Key={**self._get_key(data_record.idempotency_key)})

Diff for: docs/utilities/idempotency.md

+1-5
Original file line numberDiff line numberDiff line change
@@ -132,13 +132,9 @@ When using `idempotent_function`, you must tell us which keyword parameter in yo
132132

133133
!!! info "We support JSON serializable data, [Python Dataclasses](https://docs.python.org/3.7/library/dataclasses.html){target="_blank"}, [Parser/Pydantic Models](parser.md){target="_blank"}, and our [Event Source Data Classes](./data_classes.md){target="_blank"}."
134134

135-
???+ warning "Limitations"
135+
???+ warning "Limitation"
136136
Make sure to call your decorated function using keyword arguments.
137137

138-
Decorated functions with `idempotent_function` are not thread-safe, if the caller uses threading, not the function computation itself.
139-
140-
DynamoDB Persistency layer uses a Resource client [which is not thread-safe](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-or-multiprocessing-with-resources){target="_blank"}.
141-
142138
=== "dataclass_sample.py"
143139

144140
```python hl_lines="3-4 23 33"

Diff for: mypy.ini

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ ignore_missing_imports = True
2929
[mypy-boto3.dynamodb.conditions]
3030
ignore_missing_imports = True
3131

32+
[mypy-boto3.dynamodb.types]
33+
ignore_missing_imports = True
34+
3235
[mypy-botocore.config]
3336
ignore_missing_imports = True
3437

@@ -58,3 +61,5 @@ ignore_missing_imports = True
5861

5962
[mypy-ijson]
6063
ignore_missing_imports = True
64+
65+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import os
2+
import time
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
4+
from threading import current_thread
5+
6+
from aws_lambda_powertools.utilities.idempotency import (
7+
DynamoDBPersistenceLayer,
8+
idempotent_function,
9+
)
10+
11+
TABLE_NAME = os.getenv("IdempotencyTable", "")
12+
persistence_layer = DynamoDBPersistenceLayer(table_name=TABLE_NAME)
13+
threads_count = 2
14+
15+
16+
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
17+
def record_handler(record):
18+
time_now = time.time()
19+
return {"thread_name": current_thread().name, "time": str(time_now)}
20+
21+
22+
def lambda_handler(event, context):
23+
with ThreadPoolExecutor(max_workers=threads_count) as executor:
24+
futures = [executor.submit(record_handler, **{"record": event}) for _ in range(threads_count)]
25+
26+
return [
27+
{"state": future._state, "exception": future.exception(), "output": future.result()}
28+
for future in as_completed(futures)
29+
]

Diff for: tests/e2e/idempotency/handlers/parallel_execution_handler.py

-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
@idempotent(persistence_store=persistence_layer)
1414
def lambda_handler(event, context):
15-
1615
time.sleep(5)
1716

1817
return event

Diff for: tests/e2e/idempotency/handlers/ttl_cache_expiration_handler.py

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
@idempotent(config=config, persistence_store=persistence_layer)
1616
def lambda_handler(event, context):
17-
1817
time_now = time.time()
1918

2019
return {"time": str(time_now)}

Diff for: tests/e2e/idempotency/handlers/ttl_cache_timeout_handler.py

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
@idempotent(config=config, persistence_store=persistence_layer)
1616
def lambda_handler(event, context):
17-
1817
sleep_time: int = event.get("sleep") or 0
1918
time.sleep(sleep_time)
2019

Diff for: tests/e2e/idempotency/infrastructure.py

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def create_resources(self):
1515
table.grant_read_write_data(functions["TtlCacheExpirationHandler"])
1616
table.grant_read_write_data(functions["TtlCacheTimeoutHandler"])
1717
table.grant_read_write_data(functions["ParallelExecutionHandler"])
18+
table.grant_read_write_data(functions["FunctionThreadSafetyHandler"])
1819

1920
def _create_dynamodb_table(self) -> Table:
2021
table = dynamodb.Table(

Diff for: tests/e2e/idempotency/test_idempotency_dynamodb.py

+34
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
2222
return infrastructure.get("ParallelExecutionHandlerArn", "")
2323

2424

25+
@pytest.fixture
26+
def function_thread_safety_handler_fn_arn(infrastructure: dict) -> str:
27+
return infrastructure.get("FunctionThreadSafetyHandlerArn", "")
28+
29+
2530
@pytest.fixture
2631
def idempotency_table_name(infrastructure: dict) -> str:
2732
return infrastructure.get("DynamoDBTable", "")
@@ -97,3 +102,32 @@ def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
97102
# THEN
98103
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
99104
assert "Task timed out after" in timeout_execution_response
105+
106+
107+
@pytest.mark.xdist_group(name="idempotency")
108+
def test_idempotent_function_thread_safety(function_thread_safety_handler_fn_arn: str):
109+
# GIVEN
110+
payload = json.dumps({"message": "Lambda Powertools - Idempotent function thread safety check"})
111+
112+
# WHEN
113+
# first execution
114+
first_execution, _ = data_fetcher.get_lambda_response(
115+
lambda_arn=function_thread_safety_handler_fn_arn, payload=payload
116+
)
117+
first_execution_response = first_execution["Payload"].read().decode("utf-8")
118+
119+
# the second execution should return the same response as the first execution
120+
second_execution, _ = data_fetcher.get_lambda_response(
121+
lambda_arn=function_thread_safety_handler_fn_arn, payload=payload
122+
)
123+
second_execution_response = second_execution["Payload"].read().decode("utf-8")
124+
125+
# THEN
126+
# Function threads finished without exception AND
127+
# first and second execution is the same
128+
for function_thread in json.loads(first_execution_response):
129+
assert function_thread["state"] == "FINISHED"
130+
assert function_thread["exception"] is None
131+
assert function_thread["output"] is not None
132+
133+
assert first_execution_response == second_execution_response

Diff for: tests/e2e/parameters/infrastructure.py

-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def create_resources(self):
3434
)
3535

3636
def _create_app_config(self, function: Function):
37-
3837
service_name = build_service_name()
3938

4039
cfn_application = appconfig.CfnApplication(
@@ -82,7 +81,6 @@ def _create_app_config_freeform(
8281
function: Function,
8382
service_name: str,
8483
):
85-
8684
cfn_configuration_profile = appconfig.CfnConfigurationProfile(
8785
self.stack,
8886
"appconfig-profile",

Diff for: tests/functional/feature_flags/test_feature_flags.py

+2
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ def test_flags_conditions_rule_match_multiple_actions_multiple_rules_multiple_co
315315

316316
# check a case where the feature exists but the rule doesn't match so we revert to the default value of the feature
317317

318+
318319
# Check IN/NOT_IN/KEY_IN_VALUE/KEY_NOT_IN_VALUE/VALUE_IN_KEY/VALUE_NOT_IN_KEY conditions
319320
def test_flags_match_rule_with_in_action(mocker, config):
320321
expected_value = True
@@ -775,6 +776,7 @@ def test_get_configuration_with_envelope_and_raw(mocker, config):
775776
## Inequality test cases
776777
##
777778

779+
778780
# Test not equals
779781
def test_flags_not_equal_no_match(mocker, config):
780782
expected_value = False

0 commit comments

Comments
 (0)