diff --git a/src/aws_encryption_sdk/key_providers/base.py b/src/aws_encryption_sdk/key_providers/base.py index 3112cba6d..9ee9b7d1c 100644 --- a/src/aws_encryption_sdk/key_providers/base.py +++ b/src/aws_encryption_sdk/key_providers/base.py @@ -26,8 +26,17 @@ MasterKeyProviderError, ) from aws_encryption_sdk.internal.str_ops import to_bytes +from aws_encryption_sdk.structures import DataKey # pylint: disable=unused-import +from aws_encryption_sdk.structures import EncryptedDataKey # pylint: disable=unused-import +from aws_encryption_sdk.structures import RawDataKey # pylint: disable=unused-import from aws_encryption_sdk.structures import MasterKeyInfo +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Iterable, Union # noqa pylint: disable=unused-import +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + _LOGGER = logging.getLogger(__name__) @@ -212,6 +221,35 @@ def master_key_for_decrypt(self, key_info): self._decrypt_key_index[key_info] = decrypt_master_key return decrypt_master_key + def master_keys_for_data_key(self, data_key): + # type: (Union[DataKey, EncryptedDataKey, RawDataKey]) -> Iterable[MasterKey] + """Locates the correct master keys from children for the specified data key. + + :param data_key: Data key for which to locate owning master keys + :type data_key: :class:`EncryptedDataKey`, :class:`RawDataKey`, or :class:`DataKey` + :returns: Masters key that own data key + :rtype: iterator of :class:`MasterKey` + :raises UnknownIdentityError: if unable to locate the correct master key + """ + for member in [self] + self._members: + if member.provider_id != data_key.key_provider.provider_id: + continue + + _LOGGER.debug("attempting to locate master key from key provider: %s", member.provider_id) + + if isinstance(member, MasterKey): + if member.owns_data_key(data_key): + _LOGGER.debug("using existing master key") + yield member + + if self.vend_masterkey_on_decrypt: + try: + _LOGGER.debug("attempting to add master key: %s", data_key.key_provider.key_info) + yield member.master_key_for_decrypt(data_key.key_provider.key_info) + except InvalidKeyIdError: + _LOGGER.debug("master key %s not available in provider", data_key.key_provider.key_info) + continue + def decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context): """Iterates through all currently added Master Keys and Master Key Providers to attempt to decrypt data key. @@ -225,42 +263,25 @@ def decrypt_data_key(self, encrypted_data_key, algorithm, encryption_context): :rtype: aws_encryption_sdk.structures.DataKey :raises DecryptKeyError: if unable to decrypt encrypted data key """ - data_key = None - master_key = None _LOGGER.debug("starting decrypt data key attempt") - for member in [self] + self._members: - if member.provider_id == encrypted_data_key.key_provider.provider_id: - _LOGGER.debug("attempting to locate master key from key provider: %s", member.provider_id) - if isinstance(member, MasterKey): - _LOGGER.debug("using existing master key") - master_key = member - elif self.vend_masterkey_on_decrypt: - try: - _LOGGER.debug("attempting to add master key: %s", encrypted_data_key.key_provider.key_info) - master_key = member.master_key_for_decrypt(encrypted_data_key.key_provider.key_info) - except InvalidKeyIdError: - _LOGGER.debug( - "master key %s not available in provider", encrypted_data_key.key_provider.key_info - ) - continue - else: - continue - try: - _LOGGER.debug( - "attempting to decrypt data key with provider %s", encrypted_data_key.key_provider.key_info - ) - data_key = master_key.decrypt_data_key(encrypted_data_key, algorithm, encryption_context) - except (IncorrectMasterKeyError, DecryptKeyError) as error: - _LOGGER.debug( - "%s raised when attempting to decrypt data key with master key %s", - repr(error), - master_key.key_provider, - ) - continue - break # If this point is reached without throwing any errors, the data key has been decrypted - if not data_key: - raise DecryptKeyError("Unable to decrypt data key") - return data_key + for master_key in self.master_keys_for_data_key(encrypted_data_key): + try: + _LOGGER.debug( + "attempting to decrypt data key with provider %s", encrypted_data_key.key_provider.key_info + ) + return master_key.decrypt_data_key(encrypted_data_key, algorithm, encryption_context) + + # MasterKeyProvider.decrypt_data_key throws DecryptKeyError + # but MasterKey.decrypt_data_key throws IncorrectMasterKeyError + except (IncorrectMasterKeyError, DecryptKeyError) as error: + _LOGGER.debug( + "%s raised when attempting to decrypt data key with master key %s", + repr(error), + master_key.key_provider, + ) + continue + + raise DecryptKeyError("Unable to decrypt data key") def decrypt_data_key_from_list(self, encrypted_data_keys, algorithm, encryption_context): """Receives a list of encrypted data keys and returns the first one which this provider is able to decrypt. @@ -356,9 +377,7 @@ def owns_data_key(self, data_key): :returns: Boolean statement of ownership :rtype: bool """ - if data_key.key_provider == self.key_provider: - return True - return False + return data_key.key_provider == self.key_provider def master_keys_for_encryption(self, encryption_context, plaintext_rostream, plaintext_length=None): """Returns self and a list containing self, to match the format of output for a Master Key Provider.