Skip to content

Refactor MasterKeyProvider.master_keys_for_data_key #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 57 additions & 38 deletions src/aws_encryption_sdk/key_providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,17 @@
MasterKeyProviderError,
)
from aws_encryption_sdk.internal.str_ops import to_bytes
from aws_encryption_sdk.structures import DataKey # pylint: disable=unused-import
from aws_encryption_sdk.structures import EncryptedDataKey # pylint: disable=unused-import
from aws_encryption_sdk.structures import RawDataKey # pylint: disable=unused-import
from aws_encryption_sdk.structures import MasterKeyInfo

try: # Python 3.5.0 and 3.5.1 have incompatible typing modules
from typing import Iterable, Union # noqa pylint: disable=unused-import
except ImportError: # pragma: no cover
# We only actually need these imports when running the mypy checks
pass

_LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -212,6 +221,35 @@ def master_key_for_decrypt(self, key_info):
self._decrypt_key_index[key_info] = decrypt_master_key
return decrypt_master_key

def master_keys_for_data_key(self, data_key):
# type: (Union[DataKey, EncryptedDataKey, RawDataKey]) -> Iterable[MasterKey]
"""Locates the correct master keys from children for the specified data key.

:param data_key: Data key for which to locate owning master keys
:type data_key: :class:`EncryptedDataKey`, :class:`RawDataKey`, or :class:`DataKey`
:returns: Masters key that own data key
:rtype: iterator of :class:`MasterKey`
:raises UnknownIdentityError: if unable to locate the correct master key
"""
for member in [self] + self._members:
if member.provider_id != data_key.key_provider.provider_id:
continue

_LOGGER.debug("attempting to locate master key from key provider: %s", member.provider_id)

if isinstance(member, MasterKey):
if member.owns_data_key(data_key):
_LOGGER.debug("using existing master key")
yield member

if self.vend_masterkey_on_decrypt:
try:
_LOGGER.debug("attempting to add master key: %s", data_key.key_provider.key_info)
yield member.master_key_for_decrypt(data_key.key_provider.key_info)
except InvalidKeyIdError:
_LOGGER.debug("master key %s not available in provider", data_key.key_provider.key_info)
continue

def decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context):
"""Iterates through all currently added Master Keys and Master Key Providers
to attempt to decrypt data key.
Expand All @@ -225,42 +263,25 @@ def decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context):
:rtype: aws_encryption_sdk.structures.DataKey
:raises DecryptKeyError: if unable to decrypt encrypted data key
"""
data_key = None
master_key = None
_LOGGER.debug("starting decrypt data key attempt")
for member in [self] + self._members:
if member.provider_id == encrypted_data_key.key_provider.provider_id:
_LOGGER.debug("attempting to locate master key from key provider: %s", member.provider_id)
if isinstance(member, MasterKey):
_LOGGER.debug("using existing master key")
master_key = member
elif self.vend_masterkey_on_decrypt:
try:
_LOGGER.debug("attempting to add master key: %s", encrypted_data_key.key_provider.key_info)
master_key = member.master_key_for_decrypt(encrypted_data_key.key_provider.key_info)
except InvalidKeyIdError:
_LOGGER.debug(
"master key %s not available in provider", encrypted_data_key.key_provider.key_info
)
continue
else:
continue
try:
_LOGGER.debug(
"attempting to decrypt data key with provider %s", encrypted_data_key.key_provider.key_info
)
data_key = master_key.decrypt_data_key(encrypted_data_key, algorithm, encryption_context)
except (IncorrectMasterKeyError, DecryptKeyError) as error:
_LOGGER.debug(
"%s raised when attempting to decrypt data key with master key %s",
repr(error),
master_key.key_provider,
)
continue
break # If this point is reached without throwing any errors, the data key has been decrypted
if not data_key:
raise DecryptKeyError("Unable to decrypt data key")
return data_key
for master_key in self.master_keys_for_data_key(encrypted_data_key):
try:
_LOGGER.debug(
"attempting to decrypt data key with provider %s", encrypted_data_key.key_provider.key_info
)
return master_key.decrypt_data_key(encrypted_data_key, algorithm, encryption_context)

# MasterKeyProvider.decrypt_data_key throws DecryptKeyError
# but MasterKey.decrypt_data_key throws IncorrectMasterKeyError
except (IncorrectMasterKeyError, DecryptKeyError) as error:
_LOGGER.debug(
"%s raised when attempting to decrypt data key with master key %s",
repr(error),
master_key.key_provider,
)
continue

raise DecryptKeyError("Unable to decrypt data key")

def decrypt_data_key_from_list(self, encrypted_data_keys, algorithm, encryption_context):
"""Receives a list of encrypted data keys and returns the first one which this provider is able to decrypt.
Expand Down Expand Up @@ -356,9 +377,7 @@ def owns_data_key(self, data_key):
:returns: Boolean statement of ownership
:rtype: bool
"""
if data_key.key_provider == self.key_provider:
return True
return False
return data_key.key_provider == self.key_provider

def master_keys_for_encryption(self, encryption_context, plaintext_rostream, plaintext_length=None):
"""Returns self and a list containing self, to match the format of output for a Master Key Provider.
Expand Down