Skip to content

Commit 33c173f

Browse files
feat: Python EncryptedTable impl and tests (#1895)
1 parent fc5f30c commit 33c173f

File tree

7 files changed

+1173
-0
lines changed

7 files changed

+1173
-0
lines changed

DynamoDbEncryption/runtimes/python/src/aws_dbesdk_dynamodb/encrypted/table.py

Lines changed: 382 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
import uuid
4+
from copy import deepcopy
5+
6+
import boto3
7+
import pytest
8+
9+
from aws_dbesdk_dynamodb.encrypted.table import EncryptedTable
10+
from aws_dbesdk_dynamodb.smithygenerated.aws_cryptography_dbencryptionsdk_dynamodb_transforms.errors import (
11+
DynamoDbEncryptionTransformsException,
12+
)
13+
14+
from ...constants import (
15+
INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME,
16+
INTEG_TEST_DEFAULT_TABLE_CONFIGS,
17+
)
18+
from ...items import complex_item_dict, complex_key_dict, simple_item_dict, simple_key_dict
19+
from ...requests import (
20+
basic_delete_item_request_dict,
21+
basic_get_item_request_dict,
22+
basic_put_item_request_dict,
23+
basic_query_request_dict,
24+
basic_scan_request_dict,
25+
basic_update_item_request_dict_signed_attribute,
26+
basic_update_item_request_dict_unsigned_attribute,
27+
)
28+
29+
30+
def encrypted_table():
31+
"""Create an encrypted table."""
32+
table = boto3.resource("dynamodb").Table(INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME)
33+
return EncryptedTable(
34+
table=table,
35+
encryption_config=INTEG_TEST_DEFAULT_TABLE_CONFIGS,
36+
)
37+
38+
39+
def plaintext_table():
40+
"""Create a plaintext table."""
41+
table = boto3.resource("dynamodb").Table(INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME)
42+
return table
43+
44+
45+
# Creates a matrix of tests for each value in param,
46+
# with a user-friendly string for test output:
47+
# encrypted = True -> "encrypted"
48+
# encrypted = False -> "plaintext"
49+
@pytest.fixture(params=[True, False], ids=["encrypted", "plaintext"])
50+
def encrypted(request):
51+
return request.param
52+
53+
54+
@pytest.fixture
55+
def table(encrypted):
56+
"""
57+
Create a table client.
58+
Use both to test that the same input can be provided to both boto3 and the EncryptedTable.
59+
"""
60+
if encrypted:
61+
return encrypted_table()
62+
else:
63+
return plaintext_table()
64+
65+
66+
@pytest.fixture(scope="module")
67+
def test_run_suffix():
68+
return "-" + str(uuid.uuid4())
69+
70+
71+
# Creates a matrix of tests for each value in param,
72+
# with a user-friendly string for test output:
73+
# use_complex_item = True -> "complex_item"
74+
# use_complex_item = False -> "simple_item"
75+
@pytest.fixture(params=[simple_item_dict, complex_item_dict], ids=["simple_item", "complex_item"])
76+
def test_item(request, test_run_suffix):
77+
item = deepcopy(request.param)
78+
item["partition_key"] += test_run_suffix
79+
return item
80+
81+
82+
def test_GIVEN_item_WHEN_basic_put_AND_basic_get_AND_basic_delete_THEN_round_trip_passes(table, test_item):
83+
"""Test put_item, get_item, and delete_item operations."""
84+
# Given: Valid put_item request
85+
put_item_request_dict = basic_put_item_request_dict(test_item)
86+
# When: put_item
87+
put_response = table.put_item(**put_item_request_dict)
88+
# Then: put_item succeeds
89+
assert put_response["ResponseMetadata"]["HTTPStatusCode"] == 200
90+
91+
# Given: Valid get_item request for the same item
92+
get_item_request_dict = basic_get_item_request_dict(test_item)
93+
# When: get_item
94+
get_response = table.get_item(**get_item_request_dict)
95+
# Then: Simple item is encrypted and decrypted correctly
96+
assert get_response["ResponseMetadata"]["HTTPStatusCode"] == 200
97+
assert get_response["Item"] == put_item_request_dict["Item"]
98+
99+
# Given: Valid delete_item request for the same item
100+
delete_item_request_dict = basic_delete_item_request_dict(test_item)
101+
# When: delete_item
102+
delete_response = table.delete_item(**delete_item_request_dict)
103+
# Then: delete_item succeeds
104+
assert delete_response["ResponseMetadata"]["HTTPStatusCode"] == 200
105+
106+
# Given: Valid get_item request for the same item
107+
get_item_request_dict = basic_get_item_request_dict(test_item)
108+
# When: get_item
109+
get_response = table.get_item(**get_item_request_dict)
110+
# Then: get_item is empty (i.e. the item was deleted)
111+
assert "Item" not in get_response
112+
113+
114+
@pytest.fixture
115+
def multiple_test_items(test_run_suffix):
116+
"""Get two test items in the appropriate format for the client."""
117+
items = [deepcopy(simple_item_dict), deepcopy(complex_item_dict)]
118+
for item in items:
119+
item["partition_key"] += test_run_suffix
120+
return items
121+
122+
123+
@pytest.fixture
124+
def multiple_test_keys(test_run_suffix):
125+
"""Get two test keys in the appropriate format for the client."""
126+
keys = [deepcopy(simple_key_dict), deepcopy(complex_key_dict)]
127+
for key in keys:
128+
key["partition_key"] += test_run_suffix
129+
return keys
130+
131+
132+
def test_GIVEN_items_WHEN_batch_write_and_get_THEN_round_trip_passes(
133+
table,
134+
multiple_test_items,
135+
multiple_test_keys,
136+
):
137+
# Given: Simple and complex items in appropriate format for client
138+
# When: Batch put items
139+
with table.batch_writer() as batch_writer:
140+
# boto3 documentation for batch_writer.put_item() is incorrect;
141+
# the method accepts the item directly, not the item inside an "Item" key.
142+
for item in multiple_test_items:
143+
batch_writer.put_item(item)
144+
145+
# When: Get items
146+
for item in multiple_test_items:
147+
get_item_request_dict = basic_get_item_request_dict(item)
148+
get_response = table.get_item(**get_item_request_dict)
149+
# Then: All items are encrypted and decrypted correctly
150+
assert get_response["ResponseMetadata"]["HTTPStatusCode"] == 200
151+
assert get_response["Item"] == item
152+
153+
# When: Batch delete items
154+
with table.batch_writer() as batch_writer:
155+
for key in multiple_test_keys:
156+
batch_writer.delete_item(key)
157+
158+
# When: Get items
159+
for item in multiple_test_items:
160+
get_item_request_dict = basic_get_item_request_dict(item)
161+
get_response = table.get_item(**get_item_request_dict)
162+
# Then: All items are encrypted and decrypted correctly
163+
assert get_response["ResponseMetadata"]["HTTPStatusCode"] == 200
164+
assert "Item" not in get_response
165+
166+
167+
def test_GIVEN_items_in_table_WHEN_query_THEN_items_are_decrypted_correctly(table, test_item):
168+
"""Test query and scan operations."""
169+
# Given: Simple and complex items in appropriate format for client
170+
# When: Putting items into table
171+
put_item_request_dict = basic_put_item_request_dict(test_item)
172+
table.put_item(**put_item_request_dict)
173+
174+
# When: Querying items by partition key
175+
query_request_dict = basic_query_request_dict(test_item)
176+
query_response = table.query(**query_request_dict)
177+
# Then: Query returns correct items
178+
assert query_response["ResponseMetadata"]["HTTPStatusCode"] == 200
179+
assert len(query_response["Items"]) == 1
180+
assert query_response["Items"][0] == put_item_request_dict["Item"]
181+
182+
183+
@pytest.fixture
184+
def scan_request(encrypted, test_item):
185+
if encrypted:
186+
request = basic_scan_request_dict(test_item)
187+
# If the encrypted scan encounters a plaintext item, the scan will fail.
188+
# To avoid this, encrypted scans add a filter expression that matches only encrypted items.
189+
request["FilterExpression"] = request["FilterExpression"] + " AND attribute_exists (#sig)"
190+
request["ExpressionAttributeNames"] = {}
191+
request["ExpressionAttributeNames"]["#sig"] = "amzn-ddb-map-sig"
192+
return request
193+
return basic_scan_request_dict(test_item)
194+
195+
196+
def test_GIVEN_valid_put_and_scan_requests_WHEN_put_and_scan_THEN_round_trip_passes(table, test_item, scan_request):
197+
# Given: Simple and complex items in appropriate format for client
198+
put_item_request_dict = basic_put_item_request_dict(test_item)
199+
table.put_item(**put_item_request_dict)
200+
201+
# When: Scanning items
202+
scan_request_dict = scan_request
203+
scan_response = table.scan(**scan_request_dict)
204+
# Then: Scan succeeds
205+
# Can't assert anything about the scan;
206+
# there are too many items.
207+
# The critical assertion is that the scan succeeds.
208+
assert scan_response["ResponseMetadata"]["HTTPStatusCode"] == 200
209+
210+
211+
def test_GIVEN_update_for_unsigned_attribute_WHEN_update_item_THEN_passes(table, test_item):
212+
# Given: some item is already in the table
213+
put_response = table.put_item(**basic_put_item_request_dict(test_item))
214+
assert put_response["ResponseMetadata"]["HTTPStatusCode"] == 200
215+
216+
# Given: Valid update item request for unsigned attribute
217+
update_item_request = basic_update_item_request_dict_unsigned_attribute(test_item)
218+
219+
# When: Calling update_item
220+
update_response = table.update_item(**update_item_request)
221+
# Then: update_item succeeds
222+
assert update_response["ResponseMetadata"]["HTTPStatusCode"] == 200
223+
224+
225+
def test_GIVEN_update_for_signed_attribute_WHEN_update_item_THEN_raises_DynamoDbEncryptionTransformsException(
226+
table, test_item, encrypted
227+
):
228+
if not encrypted:
229+
pytest.skip("Skipping negative test for plaintext client")
230+
231+
# Given: some item is already in the table
232+
put_response = table.put_item(**basic_put_item_request_dict(test_item))
233+
assert put_response["ResponseMetadata"]["HTTPStatusCode"] == 200
234+
235+
# Given: Valid update item request for signed attribute
236+
update_item_request = basic_update_item_request_dict_signed_attribute(test_item)
237+
238+
# Then: raises DynamoDbEncryptionTransformsException
239+
with pytest.raises(DynamoDbEncryptionTransformsException):
240+
# When: Calling update_item
241+
table.update_item(**update_item_request)
242+
243+
244+
def test_WHEN_call_passthrough_method_THEN_correct_response_is_returned():
245+
"""Test that calling a passthrough method returns the correct response."""
246+
# Given: Encrypted or plaintext table
247+
# When: Calling some passthrough method that does not explicitly exist on EncryptedTable,
248+
# but exists on the underlying boto3 table
249+
response = encrypted_table().table_name
250+
# Then: Correct response is returned, i.e. EncryptedTable forwards the call to the underlying boto3 table
251+
assert response == INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME
252+
253+
254+
# Delete the items in the table after the module runs
255+
@pytest.fixture(scope="module", autouse=True)
256+
def cleanup_after_module(test_run_suffix):
257+
yield
258+
table = boto3.resource("dynamodb").Table(INTEG_TEST_DEFAULT_DYNAMODB_TABLE_NAME)
259+
items = [deepcopy(simple_item_dict), deepcopy(complex_item_dict)]
260+
for item in items:
261+
item["partition_key"] = item["partition_key"] + test_run_suffix
262+
table.delete_item(**basic_delete_item_request_dict(item))

0 commit comments

Comments
 (0)