-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathmost_recent.py
394 lines (327 loc) · 16.5 KB
/
most_recent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Cryptographic materials provider that uses a provider store to obtain cryptographic materials."""
import logging
import time
from collections import OrderedDict
from enum import Enum
from threading import Lock, RLock
from typing import Any, Text
import attr
import six
from dynamodb_encryption_sdk.exceptions import InvalidVersionError, NoKnownVersionError
from dynamodb_encryption_sdk.identifiers import LOGGER_NAME
from dynamodb_encryption_sdk.materials import CryptographicMaterials # noqa pylint: disable=unused-import
from dynamodb_encryption_sdk.structures import EncryptionContext # noqa pylint: disable=unused-import
from . import CryptographicMaterialsProvider
from .store import ProviderStore
__all__ = ("CachingMostRecentProvider",)
_LOGGER = logging.getLogger(LOGGER_NAME)
#: Grace period during which we will return the latest local materials. This allows multiple
#: threads to be using this same provider without risking lock contention or many threads
#: all attempting to create new versions simultaneously.
_GRACE_PERIOD = 0.5
_ENCRYPT_ACTION = "encrypt"
_DECRYPT_ACTION = "decrypt"
class TtlActions(Enum):
"""Actions that can be taken based on the version TTl state."""
EXPIRED = 0
GRACE_PERIOD = 1
LIVE = 2
def _min_capacity_validator(instance, attribute, value):
# pylint: disable=unused-argument
"""Attrs validator to require that value is at least 1."""
if value < 1:
raise ValueError("Cache capacity must be at least 1")
@attr.s(init=False)
class BasicCache(object):
"""Most basic LRU cache."""
capacity = attr.ib(validator=(attr.validators.instance_of(int), _min_capacity_validator))
def __init__(self, capacity): # noqa=D107
# type: (int) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
self.capacity = capacity
attr.validate(self)
self.__attrs_post_init__()
def __attrs_post_init__(self):
# type; () -> None
"""Initialize the internal cache."""
self._cache_lock = RLock() # attrs confuses pylint: disable=attribute-defined-outside-init
self.clear()
def _prune(self):
# type: () -> None
"""Prunes internal cache until internal cache is within the defined limit."""
with self._cache_lock:
while len(self._cache) > self.capacity:
self._cache.popitem(last=False)
def put(self, name, value):
# type: (Any, Any) -> None
"""Add a value to the cache.
:param name: Hashable object to identify the value in the cache
:param value: Value to add to cache
"""
with self._cache_lock:
self._cache[name] = value
self._prune()
def get(self, name):
# type: (Any) -> Any
"""Get a value from the cache.
:param name: Object to identify the value in the cache
:returns: Value from cache
"""
with self._cache_lock:
value = self._cache.pop(name)
# Re-insert the item to bump it to the front of the LRU cache
self.put(name, value)
return value
def clear(self):
# type: () -> None
"""Clear the cache."""
_LOGGER.debug("Clearing cache")
with self._cache_lock:
self._cache = OrderedDict() # type: OrderedDict[Any, Any] # pylint: disable=attribute-defined-outside-init
def evict(self, name):
# type: (Any) -> None
"""Evict a single entry from the cache."""
with self._cache_lock:
try:
del self._cache[name]
except KeyError:
# If the key wasn't in the cache, do nothing
pass
@attr.s(init=False)
@attr.s(init=False)
class CachingMostRecentProvider(CryptographicMaterialsProvider):
# pylint: disable=too-many-instance-attributes
"""Cryptographic materials provider that uses a provider store to obtain cryptography
materials. Materials obtained from the store are cached for a user-defined amount of time,
then removed from the cache and re-retrieved from the store.
When encrypting, the most recent provider that the provider store knows about will always
be used.
:param ProviderStore provider_store: Provider store to use
:param str material_name: Name of materials for which to ask the provider store
:param float version_ttl: Max time in seconds to go until checking with provider store
for a more recent version
:param int cache_size: The maximum number of entries that the cache can hold
"""
_provider_store = attr.ib(validator=attr.validators.instance_of(ProviderStore))
_material_name = attr.ib(validator=attr.validators.instance_of(six.string_types))
_version_ttl = attr.ib(validator=attr.validators.instance_of(float))
_cache_size = attr.ib(validator=attr.validators.instance_of(int))
def __init__(self, provider_store, material_name, version_ttl, cache_size=1000): # noqa=D107
# type: (ProviderStore, Text, float, int) -> None
# Workaround pending resolution of attrs/mypy interaction.
# https://github.com/python/mypy/issues/2088
# https://github.com/python-attrs/attrs/issues/215
self._provider_store = provider_store
self._material_name = material_name
self._version_ttl = version_ttl
self._grace_period = _GRACE_PERIOD
self._cache_size = cache_size
attr.validate(self)
self.__attrs_post_init__()
def __attrs_post_init__(self):
# type: () -> None
"""Initialize the cache."""
self._version = None # type: int # pylint: disable=attribute-defined-outside-init
self._last_updated = None # type: float # pylint: disable=attribute-defined-outside-init
self._lock = Lock() # pylint: disable=attribute-defined-outside-init
self._cache = BasicCache(self._cache_size) # pylint: disable=attribute-defined-outside-init
self.refresh()
def decryption_materials(self, encryption_context):
# type: (EncryptionContext) -> CryptographicMaterials
"""Return decryption materials.
:param EncryptionContext encryption_context: Encryption context for request
:raises AttributeError: if no decryption materials are available
"""
provider = None
version = self._provider_store.version_from_material_description(encryption_context.material_description)
ttl_action = self._ttl_action(version, _DECRYPT_ACTION)
if ttl_action is TtlActions.EXPIRED:
self._cache.evict(self._version)
_LOGGER.debug('TTL Action "%s" when getting decryption materials', ttl_action.name)
if ttl_action is TtlActions.LIVE:
try:
_LOGGER.debug("Looking in cache for encryption materials provider version %d", version)
_, provider = self._cache.get(version)
except KeyError:
_LOGGER.debug("Decryption materials provider not found in cache")
if provider is None:
try:
provider = self._get_provider_with_grace_period(version, ttl_action)
except InvalidVersionError:
_LOGGER.exception("Unable to get decryption materials from provider store.")
raise AttributeError("No decryption materials available")
return provider.decryption_materials(encryption_context)
def _ttl_action(self, version, action):
# type: (str) -> TtlActions
"""Determine the correct action to take based on the local resources and TTL.
:param action: The action being taken (encrypt or decrypt)
:returns: decision
:rtype: TtlActions
"""
try:
if action == _ENCRYPT_ACTION:
# On encrypt, always check the class-level variable indicating when we last checked for updates.
# The cache timestamps will be updated by calls to decrypt, so we don't want frequent decrypts to
# prevent us from re-checking for a newer encryption version
if self._last_updated is None:
return TtlActions.EXPIRED
timestamp = self._last_updated
else:
timestamp, _ = self._cache.get(version)
time_since_updated = time.time() - timestamp
if time_since_updated < self._version_ttl:
return TtlActions.LIVE
if time_since_updated < self._version_ttl + self._grace_period:
return TtlActions.GRACE_PERIOD
_LOGGER.debug("TTL Expired because known version has expired")
return TtlActions.EXPIRED
except KeyError:
_LOGGER.debug("TTL Expired because the requested version doesn't exist in the cache")
return TtlActions.EXPIRED
def _get_max_version(self):
# type: () -> int
"""Ask the provider store for the most recent version of this material.
:returns: Latest version in the provider store (0 if not found)
:rtype: int
"""
try:
return self._provider_store.max_version(self._material_name)
except NoKnownVersionError:
return 0
def _get_provider(self, version):
# type: (int) -> CryptographicMaterialsProvider
"""Ask the provider for a specific version of this material.
:param int version: Version to request
:returns: Cryptographic materials provider for the requested version
:rtype: CryptographicMaterialsProvider
:raises AttributeError: if provider could not locate version
"""
try:
return self._provider_store.get_or_create_provider(self._material_name, version)
except InvalidVersionError:
_LOGGER.exception("Unable to get encryption materials from provider store.")
raise AttributeError("No encryption materials available")
def _get_provider_with_grace_period(self, version, ttl_action):
# type: (int, bool) -> CryptographicMaterialsProvider
"""Ask the provider to retrieve a specific version of this material, falling back to the cache if
another caller currently holds the lock for retrieval.
:param int version: Version to request
:param TtlActions ttl_action: The ttl action to take for this version
:returns: Cryptographic materials provider for the requested version
:rtype: CryptographicMaterialsProvider
:raises AttributeError: if provider could not locate version
"""
blocking_wait = bool(ttl_action is TtlActions.EXPIRED)
acquired = self._lock.acquire(blocking_wait) # pylint: disable=consider-using-with
if not acquired:
# We failed to acquire the lock.
# If blocking, we will never reach this point.
# If not blocking, we want whatever the latest local version is.
_LOGGER.debug("Failed to acquire lock. Returning the last cached version.")
_, provider = self._cache.get(version)
return provider
try:
# If the entry was expired then we blocked waiting for the lock, so it's possible some other thread already
# queried the provider store and re-populated the cache. If so, we don't want to re-query the provider
# store, so check if the entry is back in the cache first
if ttl_action is TtlActions.EXPIRED:
try:
_, provider = self._cache.get(version)
return provider
except KeyError:
pass
provider = self._provider_store.provider(self._material_name, version)
self._cache.put(version, (time.time(), provider))
return provider
finally:
self._lock.release()
def _get_most_recent_version(self, ttl_action):
# type: (bool) -> CryptographicMaterialsProvider
"""Get the most recent version of the provider.
If allowing local and we cannot obtain the lock, just return the most recent local
version. Otherwise, wait for the lock and ask the provider store for the most recent
version of the provider.
:param TtlActions ttl_action: The ttl action to take for this version
:returns: version and corresponding cryptographic materials provider
:rtype: CryptographicMaterialsProvider
"""
blocking_wait = bool(ttl_action is TtlActions.EXPIRED)
acquired = self._lock.acquire(blocking_wait) # pylint: disable=consider-using-with
if not acquired:
# We failed to acquire the lock.
# If blocking, we will never reach this point.
# If not blocking, we want whatever the latest local version is.
_LOGGER.debug("Failed to acquire lock. Returning the last cached version.")
version = self._version
_, provider = self._cache.get(version)
return provider
try:
# If the entry was expired then we blocked waiting for the lock, so it's possible some other thread already
# queried the provider store and re-populated the cache. If so, we don't want to re-query the provider
# store, so check if the entry is back in the cache first
if ttl_action is TtlActions.EXPIRED:
try:
_, provider = self._cache.get(self._version)
return provider
except KeyError:
pass
max_version = self._get_max_version()
try:
_, provider = self._cache.get(max_version)
except KeyError:
provider = self._get_provider(max_version)
received_version = self._provider_store.version_from_material_description(
provider._material_description # pylint: disable=protected-access
)
_LOGGER.debug("Caching materials provider version %d", received_version)
self._version = received_version # pylint: disable=attribute-defined-outside-init
self._last_updated = time.time() # pylint: disable=attribute-defined-outside-init
self._cache.put(received_version, (self._last_updated, provider))
finally:
self._lock.release()
_LOGGER.debug("New latest version is %d", self._version)
return provider
def encryption_materials(self, encryption_context):
# type: (EncryptionContext) -> CryptographicMaterials
"""Return encryption materials.
:param EncryptionContext encryption_context: Encryption context for request
:raises AttributeError: if no encryption materials are available
"""
ttl_action = self._ttl_action(self._version, _ENCRYPT_ACTION)
_LOGGER.debug('TTL Action "%s" when getting encryption materials', ttl_action.name)
provider = None
if ttl_action is TtlActions.EXPIRED:
self._cache.evict(self._version)
if ttl_action is TtlActions.LIVE:
try:
_LOGGER.debug("Looking in cache for encryption materials provider version %d", self._version)
_, provider = self._cache.get(self._version)
except KeyError:
_LOGGER.debug("Encryption materials provider not found in cache")
ttl_action = TtlActions.EXPIRED
if provider is None:
_LOGGER.debug("Getting most recent materials provider version")
provider = self._get_most_recent_version(ttl_action)
return provider.encryption_materials(encryption_context)
def refresh(self):
# type: () -> None
"""Clear all local caches for this provider."""
_LOGGER.debug("Refreshing CachingMostRecentProvider instance.")
with self._lock:
self._cache.clear()
self._version = None # type: int # pylint: disable=attribute-defined-outside-init
self._last_updated = None # type: float # pylint: disable=attribute-defined-outside-init