-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathitem.py
256 lines (209 loc) · 10.3 KB
/
item.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Top-level functions for encrypting and decrypting DynamoDB items."""
try: # Python 3.5.0 and 3.5.1 have incompatible typing modules
from dynamodb_encryption_sdk.internal import dynamodb_types # noqa pylint: disable=unused-import
except ImportError: # pragma: no cover
# We only actually need these imports when running the mypy checks
pass
from dynamodb_encryption_sdk.exceptions import DecryptionError, EncryptionError
from dynamodb_encryption_sdk.identifiers import CryptoAction
from dynamodb_encryption_sdk.internal.crypto.authentication import sign_item, verify_item_signature
from dynamodb_encryption_sdk.internal.crypto.encryption import decrypt_attribute, encrypt_attribute
from dynamodb_encryption_sdk.internal.formatting.material_description import (
deserialize as deserialize_material_description,
serialize as serialize_material_description,
)
from dynamodb_encryption_sdk.internal.identifiers import (
MaterialDescriptionKeys,
MaterialDescriptionValues,
ReservedAttributes,
)
from dynamodb_encryption_sdk.transform import ddb_to_dict, dict_to_ddb
from . import CryptoConfig # noqa pylint: disable=unused-import
__all__ = ("encrypt_dynamodb_item", "encrypt_python_item", "decrypt_dynamodb_item", "decrypt_python_item")
def encrypt_dynamodb_item(item, crypto_config):
# type: (dynamodb_types.ITEM, CryptoConfig) -> dynamodb_types.ITEM
"""Encrypt a DynamoDB item.
>>> from dynamodb_encryption_sdk.encrypted.item import encrypt_dynamodb_item
>>> plaintext_item = {
... 'some': {'S': 'data'},
... 'more': {'N': '5'}
... }
>>> encrypted_item = encrypt_dynamodb_item(
... item=plaintext_item,
... crypto_config=my_crypto_config
... )
.. note::
This handles DynamoDB-formatted items and is for use with the boto3 DynamoDB client.
:param dict item: Plaintext DynamoDB item
:param CryptoConfig crypto_config: Cryptographic configuration
:returns: Encrypted and signed DynamoDB item
:rtype: dict
"""
if crypto_config.attribute_actions.take_no_actions:
# If we explicitly have been told not to do anything to this item, just copy it.
return item.copy()
for reserved_name in ReservedAttributes:
if reserved_name.value in item:
raise EncryptionError(
'Reserved attribute name "{}" is not allowed in plaintext item.'.format(reserved_name.value)
)
encryption_materials = crypto_config.encryption_materials()
inner_material_description = encryption_materials.material_description.copy()
try:
encryption_materials.encryption_key
except AttributeError:
if crypto_config.attribute_actions.contains_action(CryptoAction.ENCRYPT_AND_SIGN):
raise EncryptionError(
"Attribute actions ask for some attributes to be encrypted but no encryption key is available"
)
encrypted_item = item.copy()
else:
# Add the attribute encryption mode to the inner material description
encryption_mode = MaterialDescriptionValues.CBC_PKCS5_ATTRIBUTE_ENCRYPTION.value
inner_material_description[MaterialDescriptionKeys.ATTRIBUTE_ENCRYPTION_MODE.value] = encryption_mode
algorithm_descriptor = encryption_materials.encryption_key.algorithm + encryption_mode
encrypted_item = {}
for name, attribute in item.items():
if crypto_config.attribute_actions.action(name) is CryptoAction.ENCRYPT_AND_SIGN:
encrypted_item[name] = encrypt_attribute(
attribute_name=name,
attribute=attribute,
encryption_key=encryption_materials.encryption_key,
algorithm=algorithm_descriptor,
)
else:
encrypted_item[name] = attribute.copy()
signature_attribute = sign_item(encrypted_item, encryption_materials.signing_key, crypto_config)
encrypted_item[ReservedAttributes.SIGNATURE.value] = signature_attribute
try:
# Add the signing key algorithm identifier to the inner material description if provided
inner_material_description[
MaterialDescriptionKeys.SIGNING_KEY_ALGORITHM.value
] = encryption_materials.signing_key.signing_algorithm()
except NotImplementedError:
# Not all signing keys will provide this value
pass
material_description_attribute = serialize_material_description(inner_material_description)
encrypted_item[ReservedAttributes.MATERIAL_DESCRIPTION.value] = material_description_attribute
return encrypted_item
def encrypt_python_item(item, crypto_config):
# type: (dynamodb_types.ITEM, CryptoConfig) -> dynamodb_types.ITEM
"""Encrypt a dictionary for DynamoDB.
>>> from dynamodb_encryption_sdk.encrypted.item import encrypt_python_item
>>> plaintext_item = {
... 'some': 'data',
... 'more': 5
... }
>>> encrypted_item = encrypt_python_item(
... item=plaintext_item,
... crypto_config=my_crypto_config
... )
.. note::
This handles human-friendly dictionaries and is for use with the boto3 DynamoDB service or table resource.
:param dict item: Plaintext dictionary
:param CryptoConfig crypto_config: Cryptographic configuration
:returns: Encrypted and signed dictionary
:rtype: dict
"""
ddb_item = dict_to_ddb(item)
encrypted_ddb_item = encrypt_dynamodb_item(ddb_item, crypto_config)
return ddb_to_dict(encrypted_ddb_item)
def decrypt_dynamodb_item(item, crypto_config):
# type: (dynamodb_types.ITEM, CryptoConfig) -> dynamodb_types.ITEM
"""Decrypt a DynamoDB item.
>>> from dynamodb_encryption_sdk.encrypted.item import decrypt_python_item
>>> encrypted_item = {
... 'some': {'B': b'ENCRYPTED_DATA'},
... 'more': {'B': b'ENCRYPTED_DATA'}
... }
>>> decrypted_item = decrypt_python_item(
... item=encrypted_item,
... crypto_config=my_crypto_config
... )
.. note::
This handles DynamoDB-formatted items and is for use with the boto3 DynamoDB client.
:param dict item: Encrypted and signed DynamoDB item
:param CryptoConfig crypto_config: Cryptographic configuration
:returns: Plaintext DynamoDB item
:rtype: dict
"""
unique_actions = set([crypto_config.attribute_actions.default_action.name])
unique_actions.update(set([action.name for action in crypto_config.attribute_actions.attribute_actions.values()]))
if crypto_config.attribute_actions.take_no_actions:
# If we explicitly have been told not to do anything to this item, just copy it.
return item.copy()
try:
signature_attribute = item.pop(ReservedAttributes.SIGNATURE.value)
except KeyError:
# The signature is always written, so if no signature is found then the item was not
# encrypted or signed.
raise DecryptionError("No signature attribute found in item")
inner_crypto_config = crypto_config.copy()
# Retrieve the material description from the item if found.
try:
material_description_attribute = item.pop(ReservedAttributes.MATERIAL_DESCRIPTION.value)
except KeyError:
# If no material description is found, we use inner_crypto_config as-is.
pass
else:
# If material description is found, override the material description in inner_crypto_config.
material_description = deserialize_material_description(material_description_attribute)
inner_crypto_config.encryption_context.material_description = material_description
decryption_materials = inner_crypto_config.decryption_materials()
verify_item_signature(signature_attribute, item, decryption_materials.verification_key, inner_crypto_config)
try:
decryption_key = decryption_materials.decryption_key
except AttributeError:
if inner_crypto_config.attribute_actions.contains_action(CryptoAction.ENCRYPT_AND_SIGN):
raise DecryptionError(
"Attribute actions ask for some attributes to be decrypted but no decryption key is available"
)
return item.copy()
decryption_mode = inner_crypto_config.encryption_context.material_description.get(
MaterialDescriptionKeys.ATTRIBUTE_ENCRYPTION_MODE.value
)
algorithm_descriptor = decryption_key.algorithm + decryption_mode
# Once the signature has been verified, actually decrypt the item attributes.
decrypted_item = {}
for name, attribute in item.items():
if inner_crypto_config.attribute_actions.action(name) is CryptoAction.ENCRYPT_AND_SIGN:
decrypted_item[name] = decrypt_attribute(
attribute_name=name, attribute=attribute, decryption_key=decryption_key, algorithm=algorithm_descriptor
)
else:
decrypted_item[name] = attribute.copy()
return decrypted_item
def decrypt_python_item(item, crypto_config):
# type: (dynamodb_types.ITEM, CryptoConfig) -> dynamodb_types.ITEM
"""Decrypt a dictionary for DynamoDB.
>>> from dynamodb_encryption_sdk.encrypted.item import decrypt_python_item
>>> encrypted_item = {
... 'some': Binary(b'ENCRYPTED_DATA'),
... 'more': Binary(b'ENCRYPTED_DATA')
... }
>>> decrypted_item = decrypt_python_item(
... item=encrypted_item,
... crypto_config=my_crypto_config
... )
.. note::
This handles human-friendly dictionaries and is for use with the boto3 DynamoDB service or table resource.
:param dict item: Encrypted and signed dictionary
:param CryptoConfig crypto_config: Cryptographic configuration
:returns: Plaintext dictionary
:rtype: dict
"""
ddb_item = dict_to_ddb(item)
decrypted_ddb_item = decrypt_dynamodb_item(ddb_item, crypto_config)
return ddb_to_dict(decrypted_ddb_item)