-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathstructures.py
348 lines (289 loc) · 13.9 KB
/
structures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Common structures used by the DynamoDB Encryption Client."""
import copy
from typing import Dict, Iterable, List, Optional, Set, Text
import attr
import six
from dynamodb_encryption_sdk.exceptions import InvalidArgumentError
from dynamodb_encryption_sdk.internal.identifiers import ReservedAttributes
from dynamodb_encryption_sdk.internal.validators import dictionary_validator, iterable_validator
from .identifiers import CryptoAction
__all__ = ("EncryptionContext", "AttributeActions", "TableIndex", "TableInfo")
def _validate_attribute_values_are_ddb_items(instance, attribute, value): # pylint: disable=unused-argument
"""Validate that dictionary values in ``value`` match the structure of DynamoDB JSON
items.
.. note::
We are not trying to validate the full structure of the item with this validator.
This is just meant to verify that the values roughly match the correct format.
"""
for data in value.values():
if len(list(data.values())) != 1:
raise TypeError('"{}" values do not look like DynamoDB items'.format(attribute.name))
@attr.s(init=False)
class EncryptionContext(object):
# pylint: disable=too-few-public-methods
"""Additional information about an encryption request.
:param str table_name: Table name
:param str partition_key_name: Name of primary index partition attribute
:param str sort_key_name: Name of primary index sort attribute
:param dict attributes: Plaintext item attributes as a DynamoDB JSON dictionary
:param dict material_description: Material description to use with this request
"""
table_name = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(six.string_types)), default=None
)
partition_key_name = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(six.string_types)), default=None
)
sort_key_name = attr.ib(
validator=attr.validators.optional(attr.validators.instance_of(six.string_types)), default=None
)
attributes = attr.ib(
repr=False,
validator=(dictionary_validator(six.string_types, dict), _validate_attribute_values_are_ddb_items),
default=attr.Factory(dict),
)
material_description = attr.ib(
validator=dictionary_validator(six.string_types, six.string_types),
converter=copy.deepcopy,
default=attr.Factory(dict),
)
def __init__(
self,
table_name=None, # type: Optional[Text]
partition_key_name=None, # type: Optional[Text]
sort_key_name=None, # type: Optional[Text]
attributes=None, # type: Optional[Dict[Text, Dict]]
material_description=None, # type: Optional[Dict[Text, Text]]
): # noqa=D107
# type: (...) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
if attributes is None:
attributes = {}
if material_description is None:
material_description = {}
self.table_name = table_name
self.partition_key_name = partition_key_name
self.sort_key_name = sort_key_name
self.attributes = attributes
self.material_description = material_description
attr.validate(self)
@attr.s(init=False)
class AttributeActions(object):
"""Configuration resource used to determine what action should be taken for a specific attribute.
:param CryptoAction default_action: Action to take if no specific action is defined in
``attribute_actions``
:param dict attribute_actions: Dictionary mapping attribute names to specific actions
"""
default_action = attr.ib(validator=attr.validators.instance_of(CryptoAction), default=CryptoAction.ENCRYPT_AND_SIGN)
attribute_actions = attr.ib(
validator=dictionary_validator(six.string_types, CryptoAction), default=attr.Factory(dict)
)
def __init__(
self,
default_action=CryptoAction.ENCRYPT_AND_SIGN, # type: Optional[CryptoAction]
attribute_actions=None, # type: Optional[Dict[Text, CryptoAction]]
): # noqa=D107
# type: (...) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
if attribute_actions is None:
attribute_actions = {}
self.default_action = default_action
self.attribute_actions = attribute_actions
attr.validate(self)
self.__attrs_post_init__()
def __attrs_post_init__(self):
# () -> None
"""Determine if any actions should ever be taken with this configuration and record that for reference."""
for attribute in ReservedAttributes:
if attribute.value in self.attribute_actions:
raise ValueError('No override behavior can be set for reserved attribute "{}"'.format(attribute.value))
# Enums are not hashable, but their names are unique
_unique_actions = {self.default_action.name}
_unique_actions.update({action.name for action in self.attribute_actions.values()})
no_actions = _unique_actions == {CryptoAction.DO_NOTHING.name}
self.take_no_actions = no_actions # attrs confuses pylint: disable=attribute-defined-outside-init
def action(self, attribute_name):
# (text) -> CryptoAction
"""Determine the correct :class:`CryptoAction` to apply to a supplied attribute based
on this config.
:param str attribute_name: Attribute for which to determine action
"""
return self.attribute_actions.get(attribute_name, self.default_action)
def copy(self):
# () -> AttributeActions
"""Return a new copy of this object."""
return AttributeActions(default_action=self.default_action, attribute_actions=self.attribute_actions.copy())
def set_index_keys(self, *keys):
"""Set the appropriate action for the specified indexed attribute names.
.. warning::
If you have already set a custom action for any of these attributes, this will
raise an error.
.. code::
Default Action -> Index Key Action
DO_NOTHING -> DO_NOTHING
SIGN_ONLY -> SIGN_ONLY
ENCRYPT_AND_SIGN -> SIGN_ONLY
:param str ``*keys``: Attribute names to treat as indexed
:raises InvalidArgumentError: if a custom action was previously set for any specified
attributes
"""
for key in keys:
index_action = min(self.action(key), CryptoAction.SIGN_ONLY)
try:
if self.attribute_actions[key] is not index_action:
raise InvalidArgumentError(
'Cannot overwrite a previously requested action on indexed attribute: "{}"'.format(key)
)
except KeyError:
self.attribute_actions[key] = index_action
def contains_action(self, action):
# (CryptoAction) -> bool
"""Determine if the specified action is a possible action from this configuration.
:param CryptoAction action: Action to look for
"""
return action is self.default_action or action in self.attribute_actions.values()
def __add__(self, other):
# (AttributeActions) -> AttributeActions
"""Merge two AttributeActions objects into a new instance, applying the dominant
action in each discovered case.
"""
default_action = self.default_action + other.default_action
all_attributes = set(self.attribute_actions.keys()).union(set(other.attribute_actions.keys()))
attribute_actions = {}
for attribute in all_attributes:
attribute_actions[attribute] = max(self.action(attribute), other.action(attribute))
return AttributeActions(default_action=default_action, attribute_actions=attribute_actions)
@attr.s(init=False)
class TableIndex(object):
# pylint: disable=too-few-public-methods
"""Describes a table index.
:param str partition: Name of the partition attribute
:param str sort: Name of the sort attribute (optional)
"""
partition = attr.ib(validator=attr.validators.instance_of(six.string_types))
sort = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(six.string_types)), default=None)
def __init__(self, partition, sort=None): # noqa=D107
# type: (Text, Optional[Text]) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
self.partition = partition
self.sort = sort
attr.validate(self)
self.__attrs_post_init__()
def __attrs_post_init__(self):
"""Set the ``attributes`` attribute for ease of access later."""
self.attributes = set([self.partition]) # attrs confuses pylint: disable=attribute-defined-outside-init
if self.sort is not None:
self.attributes.add(self.sort)
@classmethod
def from_key_schema(cls, key_schema):
# type: (Iterable[Dict[Text, Text]]) -> TableIndex
"""Build a TableIndex from the key schema returned by DescribeTable.
.. code::
[
{
"KeyType": "HASH"|"RANGE",
"AttributeName": ""
},
]
:param list key_schema: KeySchema from DescribeTable response
:returns: New TableIndex that describes the provided schema
:rtype: TableIndex
"""
index = {key["KeyType"]: key["AttributeName"] for key in key_schema}
return cls(partition=index["HASH"], sort=index.get("RANGE", None))
@attr.s(init=False)
class TableInfo(object):
"""Describes a DynamoDB table.
:param str name: Table name
:param bool all_encrypting_secondary_indexes: Should we allow secondary index attributes to be encrypted?
:param TableIndex primary_index: Description of primary index
:param secondary_indexes: Set of TableIndex objects describing any secondary indexes
:type secondary_indexes: list(TableIndex)
"""
name = attr.ib(validator=attr.validators.instance_of(six.string_types))
_primary_index = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(TableIndex)), default=None)
_secondary_indexes = attr.ib(validator=attr.validators.optional(iterable_validator(list, TableIndex)), default=None)
def __init__(
self,
name, # type: Text
primary_index=None, # type: Optional[TableIndex]
secondary_indexes=None, # type: Optional[List[TableIndex]]
): # noqa=D107
# type: (...) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
self.name = name
self._primary_index = primary_index
self._secondary_indexes = secondary_indexes
attr.validate(self)
@property
def primary_index(self):
# type: () -> TableIndex
"""Return the primary TableIndex.
:returns: primary index description
:rtype: TableIndex
:raises AttributeError: if primary index is unknown
"""
if self._primary_index is None:
raise AttributeError("Indexes unknown. Run refresh_indexed_attributes")
return self._primary_index
@property
def secondary_indexes(self):
# type: () -> List[TableIndex]
"""Return the primary TableIndex.
:returns: secondary index descriptions
:rtype: TableIndex
:raises AttributeError: if secondary indexes are unknown
"""
if self._secondary_indexes is None:
raise AttributeError("Indexes unknown. Run refresh_indexed_attributes")
return self._secondary_indexes
def protected_index_keys(self):
# type: () -> Set[Text]
"""Provide a set containing the names of all indexed attributes that must not be encrypted."""
return self.primary_index.attributes
@property
def encryption_context_values(self):
# type: () -> Dict[Text, Text]
"""Build parameters needed to inform an EncryptionContext constructor about this table.
:rtype: dict
"""
values = {"table_name": self.name}
if self.primary_index is not None:
values.update(
{"partition_key_name": self.primary_index.partition, "sort_key_name": self.primary_index.sort}
)
return values
def refresh_indexed_attributes(self, client):
"""Use the provided boto3 DynamoDB client to determine all indexes for this table.
:param client: Pre-configured boto3 DynamoDB client
:type client: botocore.client.BaseClient
"""
table = client.describe_table(TableName=self.name)["Table"]
self._primary_index = TableIndex.from_key_schema(table["KeySchema"])
self._secondary_indexes = []
for group in ("LocalSecondaryIndexes", "GlobalSecondaryIndexes"):
try:
for index in table[group]:
self._secondary_indexes.append(TableIndex.from_key_schema(index["KeySchema"]))
except KeyError:
pass # Not all tables will have secondary indexes.