forked from aws/aws-dynamodb-encryption-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_kms_encrypted_resource.py
109 lines (93 loc) · 4.93 KB
/
aws_kms_encrypted_resource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Example showing use of AWS KMS CMP with EncryptedResource."""
import boto3
from boto3.dynamodb.types import Binary
from dynamodb_encryption_sdk.encrypted.resource import EncryptedResource
from dynamodb_encryption_sdk.identifiers import CryptoAction
from dynamodb_encryption_sdk.material_providers.aws_kms import AwsKmsCryptographicMaterialsProvider
from dynamodb_encryption_sdk.structures import AttributeActions
def encrypt_batch_items(table_name, aws_cmk_id):
"""Demonstrate use of EncryptedResource to transparently encrypt multiple items in a batch request."""
index_keys = [
{"partition_attribute": "is this", "sort_attribute": 55},
{"partition_attribute": "is this", "sort_attribute": 56},
{"partition_attribute": "is this", "sort_attribute": 57},
{"partition_attribute": "another", "sort_attribute": 55},
]
plaintext_additional_attributes = {
"example": "data",
"some numbers": 99,
"and some binary": Binary(b"\x00\x01\x02"),
"leave me": "alone", # We want to ignore this attribute
}
plaintext_items = []
for key in index_keys:
_attributes = key.copy()
_attributes.update(plaintext_additional_attributes)
plaintext_items.append(_attributes)
# Collect all of the attributes that will be encrypted (used later).
encrypted_attributes = set(plaintext_additional_attributes.keys())
encrypted_attributes.remove("leave me")
# Collect all of the attributes that will not be encrypted (used later).
unencrypted_attributes = set(index_keys[0].keys())
unencrypted_attributes.add("leave me")
# Create a normal service resource.
resource = boto3.resource("dynamodb")
# Create a crypto materials provider using the specified AWS KMS key.
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id)
# Create attribute actions that tells the encrypted resource to encrypt all attributes except one.
actions = AttributeActions(
default_action=CryptoAction.ENCRYPT_AND_SIGN, attribute_actions={"leave me": CryptoAction.DO_NOTHING}
)
# Use these objects to create an encrypted service resource.
encrypted_resource = EncryptedResource(resource=resource, materials_provider=aws_kms_cmp, attribute_actions=actions)
# Put the items to the table, using the encrypted service resource to transparently encrypt them.
encrypted_resource.batch_write_item(
RequestItems={table_name: [{"PutRequest": {"Item": item}} for item in plaintext_items]}
)
# Get the encrypted item using the standard service resource.
encrypted_items = resource.batch_get_item( # generated code confuse pylint: disable=no-member
RequestItems={table_name: {"Keys": index_keys}}
)["Responses"][table_name]
# Get the item using the encrypted service resource, transparently decyrpting it.
decrypted_items = encrypted_resource.batch_get_item(RequestItems={table_name: {"Keys": index_keys}})["Responses"][
table_name
]
def _select_index_from_item(item):
"""Find the index keys that match this item."""
for index in index_keys:
if all(item[key] == value for key, value in index.items()):
return index
raise Exception("Index key not found in item.")
def _select_item_from_index(index, all_items):
"""Find the item that matches these index keys."""
for item in all_items:
if all(item[key] == value for key, value in index.items()):
return item
raise Exception("Index key not found in item.")
for encrypted_item in encrypted_items:
key = _select_index_from_item(encrypted_item)
plaintext_item = _select_item_from_index(key, plaintext_items)
decrypted_item = _select_item_from_index(key, decrypted_items)
# Verify that all of the attributes are different in the encrypted item
for name in encrypted_attributes:
assert encrypted_item[name] != plaintext_item[name]
assert decrypted_item[name] == plaintext_item[name]
# Verify that all of the attributes that should not be encrypted were not.
for name in unencrypted_attributes:
assert decrypted_item[name] == encrypted_item[name] == plaintext_item[name]
# Clean up the item
encrypted_resource.batch_write_item(
RequestItems={table_name: [{"DeleteRequest": {"Key": key}} for key in index_keys]}
)