Skip to content

Commit d687ce9

Browse files
author
Michal Ploski
committed
Add low level dynamodb client
1 parent 4af71c2 commit d687ce9

File tree

9 files changed

+187
-115
lines changed

9 files changed

+187
-115
lines changed

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DataRecord:
3737

3838
def __init__(
3939
self,
40-
idempotency_key,
40+
idempotency_key: str,
4141
status: str = "",
4242
expiry_timestamp: Optional[int] = None,
4343
in_progress_expiry_timestamp: Optional[int] = None,

Diff for: aws_lambda_powertools/utilities/idempotency/persistence/dynamodb.py

+45-49
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from typing import Any, Dict, Optional
55

66
import boto3
7+
from boto3.dynamodb.types import TypeDeserializer
78
from botocore.config import Config
9+
from botocore.exceptions import ClientError
810

911
from aws_lambda_powertools.shared import constants
1012
from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer
@@ -79,13 +81,14 @@ def __init__(
7981

8082
self._boto_config = boto_config or Config()
8183
self._boto3_session = boto3_session or boto3.session.Session()
84+
self._client = self._boto3_session.client("dynamodb", config=self._boto_config)
85+
8286
if sort_key_attr == key_attr:
8387
raise ValueError(f"key_attr [{key_attr}] and sort_key_attr [{sort_key_attr}] cannot be the same!")
8488

8589
if static_pk_value is None:
8690
static_pk_value = f"idempotency#{os.getenv(constants.LAMBDA_FUNCTION_NAME_ENV, '')}"
8791

88-
self._table = None
8992
self.table_name = table_name
9093
self.key_attr = key_attr
9194
self.static_pk_value = static_pk_value
@@ -95,31 +98,15 @@ def __init__(
9598
self.status_attr = status_attr
9699
self.data_attr = data_attr
97100
self.validation_key_attr = validation_key_attr
98-
super(DynamoDBPersistenceLayer, self).__init__()
99101

100-
@property
101-
def table(self):
102-
"""
103-
Caching property to store boto3 dynamodb Table resource
102+
self._deserializer = TypeDeserializer()
104103

105-
"""
106-
if self._table:
107-
return self._table
108-
ddb_resource = self._boto3_session.resource("dynamodb", config=self._boto_config)
109-
self._table = ddb_resource.Table(self.table_name)
110-
return self._table
111-
112-
@table.setter
113-
def table(self, table):
114-
"""
115-
Allow table instance variable to be set directly, primarily for use in tests
116-
"""
117-
self._table = table
104+
super(DynamoDBPersistenceLayer, self).__init__()
118105

119106
def _get_key(self, idempotency_key: str) -> dict:
120107
if self.sort_key_attr:
121-
return {self.key_attr: self.static_pk_value, self.sort_key_attr: idempotency_key}
122-
return {self.key_attr: idempotency_key}
108+
return {self.key_attr: {"S": self.static_pk_value}, self.sort_key_attr: {"S": idempotency_key}}
109+
return {self.key_attr: {"S": idempotency_key}}
123110

124111
def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
125112
"""
@@ -136,36 +123,39 @@ def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord:
136123
representation of item
137124
138125
"""
126+
data = self._deserializer.deserialize({"M": item})
139127
return DataRecord(
140-
idempotency_key=item[self.key_attr],
141-
status=item[self.status_attr],
142-
expiry_timestamp=item[self.expiry_attr],
143-
in_progress_expiry_timestamp=item.get(self.in_progress_expiry_attr),
144-
response_data=item.get(self.data_attr),
145-
payload_hash=item.get(self.validation_key_attr),
128+
idempotency_key=data[self.key_attr],
129+
status=data[self.status_attr],
130+
expiry_timestamp=data[self.expiry_attr],
131+
in_progress_expiry_timestamp=data.get(self.in_progress_expiry_attr),
132+
response_data=data.get(self.data_attr),
133+
payload_hash=data.get(self.validation_key_attr),
146134
)
147135

148136
def _get_record(self, idempotency_key) -> DataRecord:
149-
response = self.table.get_item(Key=self._get_key(idempotency_key), ConsistentRead=True)
150-
137+
response = self._client.get_item(
138+
TableName=self.table_name, Key=self._get_key(idempotency_key), ConsistentRead=True
139+
)
151140
try:
152141
item = response["Item"]
153-
except KeyError:
154-
raise IdempotencyItemNotFoundError
142+
except KeyError as exc:
143+
raise IdempotencyItemNotFoundError from exc
155144
return self._item_to_data_record(item)
156145

157146
def _put_record(self, data_record: DataRecord) -> None:
158147
item = {
159148
**self._get_key(data_record.idempotency_key),
160-
self.expiry_attr: data_record.expiry_timestamp,
161-
self.status_attr: data_record.status,
149+
self.key_attr: {"S": data_record.idempotency_key},
150+
self.expiry_attr: {"N": str(data_record.expiry_timestamp)},
151+
self.status_attr: {"S": data_record.status},
162152
}
163153

164154
if data_record.in_progress_expiry_timestamp is not None:
165-
item[self.in_progress_expiry_attr] = data_record.in_progress_expiry_timestamp
155+
item[self.in_progress_expiry_attr] = {"N": str(data_record.in_progress_expiry_timestamp)}
166156

167157
if self.payload_validation_enabled:
168-
item[self.validation_key_attr] = data_record.payload_hash
158+
item[self.validation_key_attr] = {"S": data_record.payload_hash}
169159

170160
now = datetime.datetime.now()
171161
try:
@@ -199,8 +189,8 @@ def _put_record(self, data_record: DataRecord) -> None:
199189
condition_expression = (
200190
f"{idempotency_key_not_exist} OR {idempotency_expiry_expired} OR ({inprogress_expiry_expired})"
201191
)
202-
203-
self.table.put_item(
192+
self._client.put_item(
193+
TableName=self.table_name,
204194
Item=item,
205195
ConditionExpression=condition_expression,
206196
ExpressionAttributeNames={
@@ -210,22 +200,28 @@ def _put_record(self, data_record: DataRecord) -> None:
210200
"#status": self.status_attr,
211201
},
212202
ExpressionAttributeValues={
213-
":now": int(now.timestamp()),
214-
":now_in_millis": int(now.timestamp() * 1000),
215-
":inprogress": STATUS_CONSTANTS["INPROGRESS"],
203+
":now": {"N": str(int(now.timestamp()))},
204+
":now_in_millis": {"N": str(int(now.timestamp() * 1000))},
205+
":inprogress": {"S": STATUS_CONSTANTS["INPROGRESS"]},
216206
},
217207
)
218-
except self.table.meta.client.exceptions.ConditionalCheckFailedException:
219-
logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}")
220-
raise IdempotencyItemAlreadyExistsError
208+
except ClientError as exc:
209+
error_code = exc.response.get("Error", {}).get("Code")
210+
if error_code == "ConditionalCheckFailedException":
211+
logger.debug(
212+
f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}"
213+
)
214+
raise IdempotencyItemAlreadyExistsError from exc
215+
else:
216+
raise
221217

222218
def _update_record(self, data_record: DataRecord):
223219
logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}")
224220
update_expression = "SET #response_data = :response_data, #expiry = :expiry, " "#status = :status"
225221
expression_attr_values = {
226-
":expiry": data_record.expiry_timestamp,
227-
":response_data": data_record.response_data,
228-
":status": data_record.status,
222+
":expiry": {"N": str(data_record.expiry_timestamp)},
223+
":response_data": {"S": data_record.response_data},
224+
":status": {"S": data_record.status},
229225
}
230226
expression_attr_names = {
231227
"#expiry": self.expiry_attr,
@@ -235,7 +231,7 @@ def _update_record(self, data_record: DataRecord):
235231

236232
if self.payload_validation_enabled:
237233
update_expression += ", #validation_key = :validation_key"
238-
expression_attr_values[":validation_key"] = data_record.payload_hash
234+
expression_attr_values[":validation_key"] = {"S": data_record.payload_hash}
239235
expression_attr_names["#validation_key"] = self.validation_key_attr
240236

241237
kwargs = {
@@ -245,8 +241,8 @@ def _update_record(self, data_record: DataRecord):
245241
"ExpressionAttributeNames": expression_attr_names,
246242
}
247243

248-
self.table.update_item(**kwargs)
244+
self._client.update_item(TableName=self.table_name, **kwargs)
249245

250246
def _delete_record(self, data_record: DataRecord) -> None:
251247
logger.debug(f"Deleting record for idempotency key: {data_record.idempotency_key}")
252-
self.table.delete_item(Key=self._get_key(data_record.idempotency_key))
248+
self._client.delete_item(TableName=self.table_name, Key={**self._get_key(data_record.idempotency_key)})

Diff for: mypy.ini

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ ignore_missing_imports = True
2929
[mypy-boto3.dynamodb.conditions]
3030
ignore_missing_imports = True
3131

32+
[mypy-boto3.dynamodb.types]
33+
ignore_missing_imports = True
34+
3235
[mypy-botocore.config]
3336
ignore_missing_imports = True
3437

@@ -58,3 +61,5 @@ ignore_missing_imports = True
5861

5962
[mypy-ijson]
6063
ignore_missing_imports = True
64+
65+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import os
2+
import time
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
4+
from threading import current_thread
5+
6+
from aws_lambda_powertools.utilities.idempotency import (
7+
DynamoDBPersistenceLayer,
8+
idempotent_function,
9+
)
10+
11+
TABLE_NAME = os.getenv("IdempotencyTable", "")
12+
persistence_layer = DynamoDBPersistenceLayer(table_name=TABLE_NAME)
13+
threads_count = 2
14+
15+
16+
@idempotent_function(persistence_store=persistence_layer, data_keyword_argument="record")
17+
def record_handler(record):
18+
time_now = time.time()
19+
return {"thread_name": current_thread().name, "time": str(time_now)}
20+
21+
22+
def lambda_handler(event, context):
23+
with ThreadPoolExecutor(max_workers=threads_count) as executor:
24+
futures = [executor.submit(record_handler, **{"record": event}) for _ in range(threads_count)]
25+
26+
output = []
27+
for future in as_completed(futures):
28+
output.append({"state": future._state, "exception": future.exception(), "output": future.result()})
29+
30+
return output

Diff for: tests/e2e/idempotency/infrastructure.py

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def create_resources(self):
1515
table.grant_read_write_data(functions["TtlCacheExpirationHandler"])
1616
table.grant_read_write_data(functions["TtlCacheTimeoutHandler"])
1717
table.grant_read_write_data(functions["ParallelExecutionHandler"])
18+
table.grant_read_write_data(functions["ParallelFunctionsHandler"])
1819

1920
def _create_dynamodb_table(self) -> Table:
2021
table = dynamodb.Table(

Diff for: tests/e2e/idempotency/test_idempotency_dynamodb.py

+31-3
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
2222
return infrastructure.get("ParallelExecutionHandlerArn", "")
2323

2424

25+
@pytest.fixture
26+
def parallel_functions_handler_fn_arn(infrastructure: dict) -> str:
27+
return infrastructure.get("ParallelFunctionsHandlerArn", "")
28+
29+
2530
@pytest.fixture
2631
def idempotency_table_name(infrastructure: dict) -> str:
2732
return infrastructure.get("DynamoDBTable", "")
2833

2934

30-
@pytest.mark.xdist_group(name="idempotency")
3135
def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
3236
# GIVEN
3337
payload = json.dumps({"message": "Lambda Powertools - TTL 5s"})
@@ -57,7 +61,6 @@ def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn:
5761
assert third_execution_response != second_execution_response
5862

5963

60-
@pytest.mark.xdist_group(name="idempotency")
6164
def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
6265
# GIVEN
6366
payload_timeout_execution = json.dumps({"sleep": 5, "message": "Lambda Powertools - TTL 1s"})
@@ -81,7 +84,6 @@ def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
8184
assert payload_working_execution == execution_working_response
8285

8386

84-
@pytest.mark.xdist_group(name="idempotency")
8587
def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
8688
# GIVEN
8789
arguments = json.dumps({"message": "Lambda Powertools - Parallel execution"})
@@ -97,3 +99,29 @@ def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
9799
# THEN
98100
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
99101
assert "Task timed out after" in timeout_execution_response
102+
103+
104+
def test_parallel_functions_execution_idempotency(parallel_functions_handler_fn_arn: str):
105+
# GIVEN
106+
payload = json.dumps({"message": "Lambda Powertools - Parallel functions execution"})
107+
108+
# WHEN
109+
# first execution
110+
first_execution, _ = data_fetcher.get_lambda_response(lambda_arn=parallel_functions_handler_fn_arn, payload=payload)
111+
first_execution_response = first_execution["Payload"].read().decode("utf-8")
112+
113+
# the second execution should return the same response as the first execution
114+
second_execution, _ = data_fetcher.get_lambda_response(
115+
lambda_arn=parallel_functions_handler_fn_arn, payload=payload
116+
)
117+
second_execution_response = second_execution["Payload"].read().decode("utf-8")
118+
119+
# THEN
120+
# Function threads finished without exception AND
121+
# first and second execution is the same
122+
for function_thread in json.loads(first_execution_response):
123+
assert function_thread["state"] == "FINISHED"
124+
assert function_thread["exception"] is None
125+
assert function_thread["output"] is not None
126+
127+
assert first_execution_response == second_execution_response

Diff for: tests/functional/idempotency/conftest.py

+28-20
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ def expected_params_update_item(serialized_lambda_response, hashed_idempotency_k
8585
"#status": "status",
8686
},
8787
"ExpressionAttributeValues": {
88-
":expiry": stub.ANY,
89-
":response_data": serialized_lambda_response,
90-
":status": "COMPLETED",
88+
":expiry": {"N": stub.ANY},
89+
":response_data": {"S": serialized_lambda_response},
90+
":status": {"S": "COMPLETED"},
9191
},
92-
"Key": {"id": hashed_idempotency_key},
92+
"Key": {"id": {"S": hashed_idempotency_key}},
9393
"TableName": "TEST_TABLE",
9494
"UpdateExpression": "SET #response_data = :response_data, " "#expiry = :expiry, #status = :status",
9595
}
@@ -107,12 +107,12 @@ def expected_params_update_item_with_validation(
107107
"#validation_key": "validation",
108108
},
109109
"ExpressionAttributeValues": {
110-
":expiry": stub.ANY,
111-
":response_data": serialized_lambda_response,
112-
":status": "COMPLETED",
113-
":validation_key": hashed_validation_key,
110+
":expiry": {"N": stub.ANY},
111+
":response_data": {"S": serialized_lambda_response},
112+
":status": {"S": "COMPLETED"},
113+
":validation_key": {"S": hashed_validation_key},
114114
},
115-
"Key": {"id": hashed_idempotency_key},
115+
"Key": {"id": {"S": hashed_idempotency_key}},
116116
"TableName": "TEST_TABLE",
117117
"UpdateExpression": (
118118
"SET #response_data = :response_data, "
@@ -135,12 +135,16 @@ def expected_params_put_item(hashed_idempotency_key):
135135
"#status": "status",
136136
"#in_progress_expiry": "in_progress_expiration",
137137
},
138-
"ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"},
138+
"ExpressionAttributeValues": {
139+
":now": {"N": stub.ANY},
140+
":now_in_millis": {"N": stub.ANY},
141+
":inprogress": {"S": "INPROGRESS"},
142+
},
139143
"Item": {
140-
"expiration": stub.ANY,
141-
"id": hashed_idempotency_key,
142-
"status": "INPROGRESS",
143-
"in_progress_expiration": stub.ANY,
144+
"expiration": {"N": stub.ANY},
145+
"in_progress_expiration": {"N": stub.ANY},
146+
"id": {"S": hashed_idempotency_key},
147+
"status": {"S": "INPROGRESS"},
144148
},
145149
"TableName": "TEST_TABLE",
146150
}
@@ -159,13 +163,17 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali
159163
"#status": "status",
160164
"#in_progress_expiry": "in_progress_expiration",
161165
},
162-
"ExpressionAttributeValues": {":now": stub.ANY, ":now_in_millis": stub.ANY, ":inprogress": "INPROGRESS"},
166+
"ExpressionAttributeValues": {
167+
":now": {"N": stub.ANY},
168+
":now_in_millis": {"N": stub.ANY},
169+
":inprogress": {"S": "INPROGRESS"},
170+
},
163171
"Item": {
164-
"expiration": stub.ANY,
165-
"in_progress_expiration": stub.ANY,
166-
"id": hashed_idempotency_key,
167-
"status": "INPROGRESS",
168-
"validation": hashed_validation_key,
172+
"expiration": {"N": stub.ANY},
173+
"in_progress_expiration": {"N": stub.ANY},
174+
"id": {"S": hashed_idempotency_key},
175+
"status": {"S": "INPROGRESS"},
176+
"validation": {"S": hashed_validation_key},
169177
},
170178
"TableName": "TEST_TABLE",
171179
}

0 commit comments

Comments
 (0)