forked from aws/aws-dynamodb-encryption-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaws_kms_encrypted_client.py
157 lines (132 loc) · 7.27 KB
/
aws_kms_encrypted_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Example showing use of AWS KMS CMP with EncryptedClient."""
import boto3
from dynamodb_encryption_sdk.encrypted.client import EncryptedClient
from dynamodb_encryption_sdk.identifiers import CryptoAction
from dynamodb_encryption_sdk.material_providers.aws_kms import AwsKmsCryptographicMaterialsProvider
from dynamodb_encryption_sdk.structures import AttributeActions
def encrypt_item(table_name, aws_cmk_id):
"""Demonstrate use of EncryptedClient to transparently encrypt an item."""
index_key = {"partition_attribute": {"S": "is this"}, "sort_attribute": {"N": "55"}}
plaintext_item = {
"example": {"S": "data"},
"some numbers": {"N": "99"},
"and some binary": {"B": b"\x00\x01\x02"},
"leave me": {"S": "alone"}, # We want to ignore this attribute
}
# Collect all of the attributes that will be encrypted (used later).
encrypted_attributes = set(plaintext_item.keys())
encrypted_attributes.remove("leave me")
# Collect all of the attributes that will not be encrypted (used later).
unencrypted_attributes = set(index_key.keys())
unencrypted_attributes.add("leave me")
# Add the index pairs to the item.
plaintext_item.update(index_key)
# Create a normal client.
client = boto3.client("dynamodb")
# Create a crypto materials provider using the specified AWS KMS key.
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id)
# Create attribute actions that tells the encrypted client to encrypt all attributes except one.
actions = AttributeActions(
default_action=CryptoAction.ENCRYPT_AND_SIGN, attribute_actions={"leave me": CryptoAction.DO_NOTHING}
)
# Use these objects to create an encrypted client.
encrypted_client = EncryptedClient(client=client, materials_provider=aws_kms_cmp, attribute_actions=actions)
# Put the item to the table, using the encrypted client to transparently encrypt it.
encrypted_client.put_item(TableName=table_name, Item=plaintext_item)
# Get the encrypted item using the standard client.
encrypted_item = client.get_item(TableName=table_name, Key=index_key)["Item"]
# Get the item using the encrypted client, transparently decyrpting it.
decrypted_item = encrypted_client.get_item(TableName=table_name, Key=index_key)["Item"]
# Verify that all of the attributes are different in the encrypted item
for name in encrypted_attributes:
assert encrypted_item[name] != plaintext_item[name]
assert decrypted_item[name] == plaintext_item[name]
# Verify that all of the attributes that should not be encrypted were not.
for name in unencrypted_attributes:
assert decrypted_item[name] == encrypted_item[name] == plaintext_item[name]
# Clean up the item
encrypted_client.delete_item(TableName=table_name, Key=index_key)
def encrypt_batch_items(table_name, aws_cmk_id):
"""Demonstrate use of EncryptedClient to transparently encrypt multiple items in a batch request."""
index_keys = [
{"partition_attribute": {"S": "is this"}, "sort_attribute": {"N": "55"}},
{"partition_attribute": {"S": "is this"}, "sort_attribute": {"N": "56"}},
{"partition_attribute": {"S": "is this"}, "sort_attribute": {"N": "57"}},
{"partition_attribute": {"S": "another"}, "sort_attribute": {"N": "55"}},
]
plaintext_additional_attributes = {
"example": {"S": "data"},
"some numbers": {"N": "99"},
"and some binary": {"B": b"\x00\x01\x02"},
"leave me": {"S": "alone"}, # We want to ignore this attribute
}
plaintext_items = []
for key in index_keys:
_attributes = key.copy()
_attributes.update(plaintext_additional_attributes)
plaintext_items.append(_attributes)
# Collect all of the attributes that will be encrypted (used later).
encrypted_attributes = set(plaintext_additional_attributes.keys())
encrypted_attributes.remove("leave me")
# Collect all of the attributes that will not be encrypted (used later).
unencrypted_attributes = set(index_keys[0].keys())
unencrypted_attributes.add("leave me")
# Create a normal client.
client = boto3.client("dynamodb")
# Create a crypto materials provider using the specified AWS KMS key.
aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id)
# Create attribute actions that tells the encrypted client to encrypt all attributes except one.
actions = AttributeActions(
default_action=CryptoAction.ENCRYPT_AND_SIGN, attribute_actions={"leave me": CryptoAction.DO_NOTHING}
)
# Use these objects to create an encrypted client.
encrypted_client = EncryptedClient(client=client, materials_provider=aws_kms_cmp, attribute_actions=actions)
# Put the items to the table, using the encrypted client to transparently encrypt them.
encrypted_client.batch_write_item(
RequestItems={table_name: [{"PutRequest": {"Item": item}} for item in plaintext_items]}
)
# Get the encrypted item using the standard client.
encrypted_items = client.batch_get_item(RequestItems={table_name: {"Keys": index_keys}})["Responses"][table_name]
# Get the item using the encrypted client, transparently decyrpting it.
decrypted_items = encrypted_client.batch_get_item(RequestItems={table_name: {"Keys": index_keys}})["Responses"][
table_name
]
def _select_index_from_item(item):
"""Find the index keys that match this item."""
for index in index_keys:
if all(item[key] == value for key, value in index.items()):
return index
raise Exception("Index key not found in item.")
def _select_item_from_index(index, all_items):
"""Find the item that matches these index keys."""
for item in all_items:
if all(item[key] == value for key, value in index.items()):
return item
raise Exception("Index key not found in item.")
for encrypted_item in encrypted_items:
key = _select_index_from_item(encrypted_item)
plaintext_item = _select_item_from_index(key, plaintext_items)
decrypted_item = _select_item_from_index(key, decrypted_items)
# Verify that all of the attributes are different in the encrypted item
for name in encrypted_attributes:
assert encrypted_item[name] != plaintext_item[name]
assert decrypted_item[name] == plaintext_item[name]
# Verify that all of the attributes that should not be encrypted were not.
for name in unencrypted_attributes:
assert decrypted_item[name] == encrypted_item[name] == plaintext_item[name]
# Clean up the item
encrypted_client.batch_write_item(
RequestItems={table_name: [{"DeleteRequest": {"Key": key}} for key in index_keys]}
)