diff --git a/doc/index.rst b/doc/index.rst index b4a0476f9..2e21bccbd 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -15,6 +15,8 @@ Modules aws_encryption_sdk.caches.local aws_encryption_sdk.caches.null aws_encryption_sdk.keyrings.base + aws_encryption_sdk.keyrings.aws_kms + aws_encryption_sdk.keyrings.aws_kms.client_suppliers aws_encryption_sdk.keyrings.multi aws_encryption_sdk.keyrings.raw aws_encryption_sdk.key_providers.base @@ -40,6 +42,8 @@ Modules aws_encryption_sdk.internal.formatting.serialize aws_encryption_sdk.internal.str_ops aws_encryption_sdk.internal.structures + aws_encryption_sdk.internal.validators aws_encryption_sdk.internal.utils + aws_encryption_sdk.keyrings.aws_kms._client_cache .. include:: ../CHANGELOG.rst diff --git a/setup.cfg b/setup.cfg index 038fc5924..0671c3c64 100644 --- a/setup.cfg +++ b/setup.cfg @@ -52,4 +52,4 @@ force_grid_wrap = 0 combine_as_imports = True not_skip = __init__.py known_first_party = aws_encryption_sdk -known_third_party = attr,awses_test_vectors,basic_encryption,basic_file_encryption_with_multiple_providers,basic_file_encryption_with_raw_key_provider,boto3,botocore,cryptography,data_key_caching_basic,integration_test_utils,mock,pytest,pytest_mock,setuptools,six,typing,wrapt +known_third_party = attr,awacs,aws_encryption_sdk_decrypt_oracle,awses_test_vectors,boto3,botocore,chalice,cryptography,integration_test_utils,mock,moto,pytest,pytest_mock,requests,setuptools,six,troposphere,wrapt diff --git a/src/aws_encryption_sdk/internal/validators.py b/src/aws_encryption_sdk/internal/validators.py new file mode 100644 index 000000000..66056602e --- /dev/null +++ b/src/aws_encryption_sdk/internal/validators.py @@ -0,0 +1,21 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Common ``attrs`` validators.""" +import attr # only used by mypy, so pylint: disable=unused-import +import six + +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Any # noqa pylint: disable=unused-import +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + + +def value_is_not_a_string(instance, attribute, value): + # type: (Any, attr.Attribute, Any) -> None + """Technically a string is an iterable containing strings. + + This validator lets you accept other iterators but not strings. + """ + if isinstance(value, six.string_types): + raise TypeError("'{}' must not a string".format(attribute.name)) diff --git a/src/aws_encryption_sdk/keyrings/aws_kms/__init__.py b/src/aws_encryption_sdk/keyrings/aws_kms/__init__.py new file mode 100644 index 000000000..808dde163 --- /dev/null +++ b/src/aws_encryption_sdk/keyrings/aws_kms/__init__.py @@ -0,0 +1,375 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Keyring for use with AWS Key Management Service (KMS). + +.. versionadded:: 1.5.0 + +""" +import logging + +import attr +import six +from attr.validators import deep_iterable, instance_of, is_callable, optional + +from aws_encryption_sdk.exceptions import DecryptKeyError, EncryptKeyError +from aws_encryption_sdk.identifiers import AlgorithmSuite +from aws_encryption_sdk.internal.validators import value_is_not_a_string +from aws_encryption_sdk.keyrings.base import Keyring +from aws_encryption_sdk.keyrings.multi import MultiKeyring +from aws_encryption_sdk.materials_managers import DecryptionMaterials, EncryptionMaterials +from aws_encryption_sdk.structures import EncryptedDataKey, KeyringTrace, KeyringTraceFlag, MasterKeyInfo, RawDataKey + +from .client_suppliers import DefaultClientSupplier + +from .client_suppliers import ClientSupplier # noqa - only used in docstring params; this confuses flake8 + +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Dict, Iterable, Union # noqa pylint: disable=unused-import + from .client_suppliers import ClientSupplierType # noqa pylint: disable=unused-import +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + +__all__ = ("KmsKeyring",) + +_LOGGER = logging.getLogger(__name__) +_PROVIDER_ID = "aws-kms" +_GENERATE_FLAGS = {KeyringTraceFlag.WRAPPING_KEY_GENERATED_DATA_KEY} +_ENCRYPT_FLAGS = {KeyringTraceFlag.WRAPPING_KEY_ENCRYPTED_DATA_KEY, KeyringTraceFlag.WRAPPING_KEY_SIGNED_ENC_CTX} +_DECRYPT_FLAGS = {KeyringTraceFlag.WRAPPING_KEY_DECRYPTED_DATA_KEY, KeyringTraceFlag.WRAPPING_KEY_VERIFIED_ENC_CTX} + + +@attr.s +class KmsKeyring(Keyring): + """Keyring that uses AWS Key Management Service (KMS) Customer Master Keys (CMKs) to manage wrapping keys. + + Set ``generator_key_id`` to require that the keyring use that CMK to generate the data key. + If you do not set ``generator_key_id``, the keyring will not generate a data key. + + Set ``child_key_ids`` to specify additional CMKs that the keyring will use to encrypt the data key. + + The keyring will attempt to use any CMKs + identified by CMK ARN in either ``generator_key_id`` or ``child_key_ids`` on decrypt. + + You can identify CMKs by any `valid key ID`_ for the keyring to use on encrypt, + but for the keyring to attempt to use them on decrypt + you MUST specify the CMK ARN. + + If you specify neither ``generator_key_id`` nor ``child_key_ids`` + then the keyring will operate in `discovery mode`_, + doing nothing on encrypt and attempting to decrypt any AWS KMS-encrypted data key on decrypt. + + You can use the :class:`ClientSupplier` to customize behavior further, + such as to provide different credentials for different regions + or to restrict which regions are allowed. + + See the `AWS KMS Keyring specification`_ for more details. + + .. _AWS KMS Keyring specification: + https://github.com/awslabs/aws-encryption-sdk-specification/blob/master/framework/kms-keyring.md + .. _valid key ID: + https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html#API_GenerateDataKey_RequestSyntax + .. _discovery mode: + https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/choose-keyring.html#kms-keyring-discovery + + .. versionadded:: 1.5.0 + + :param ClientSupplier client_supplier: Client supplier that provides AWS KMS clients (optional) + :param str generator_key_id: Key ID of AWS KMS CMK to use when generating data keys (optional) + :param List[str] child_key_ids: Key IDs that will be used to encrypt and decrypt data keys (optional) + :param List[str] grant_tokens: AWS KMS grant tokens to include in requests (optional) + """ + + _client_supplier = attr.ib(default=attr.Factory(DefaultClientSupplier), validator=is_callable()) + _generator_key_id = attr.ib(default=None, validator=optional(instance_of(six.string_types))) + _child_key_ids = attr.ib( + default=attr.Factory(tuple), + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string), + ) + _grant_tokens = attr.ib( + default=attr.Factory(tuple), + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string), + ) + + def __attrs_post_init__(self): + """Configure internal keyring.""" + if self._generator_key_id is None: + generator_keyring = None + else: + generator_keyring = _AwsKmsSingleCmkKeyring( + key_id=self._generator_key_id, client_supplier=self._client_supplier, grant_tokens=self._grant_tokens + ) + + child_keyrings = [ + _AwsKmsSingleCmkKeyring( + key_id=key_id, client_supplier=self._client_supplier, grant_tokens=self._grant_tokens + ) + for key_id in self._child_key_ids + ] + + self._is_discovery = generator_keyring is None and not child_keyrings + + if self._is_discovery: + self._inner_keyring = _AwsKmsDiscoveryKeyring( + client_supplier=self._client_supplier, grant_tokens=self._grant_tokens + ) + else: + self._inner_keyring = MultiKeyring(generator=generator_keyring, children=child_keyrings) + + def on_encrypt(self, encryption_materials): + # type: (EncryptionMaterials) -> EncryptionMaterials + """Generate a data key using generator keyring + and encrypt it using any available wrapping key in any child keyring. + + :param EncryptionMaterials encryption_materials: Encryption materials for keyring to modify. + :returns: Optionally modified encryption materials. + :rtype: EncryptionMaterials + :raises EncryptKeyError: if unable to encrypt data key. + """ + return self._inner_keyring.on_encrypt(encryption_materials=encryption_materials) + + def on_decrypt(self, decryption_materials, encrypted_data_keys): + # type: (DecryptionMaterials, Iterable[EncryptedDataKey]) -> DecryptionMaterials + """Attempt to decrypt the encrypted data keys. + + :param DecryptionMaterials decryption_materials: Decryption materials for keyring to modify. + :param List[EncryptedDataKey] encrypted_data_keys: List of encrypted data keys. + :returns: Optionally modified decryption materials. + :rtype: DecryptionMaterials + """ + return self._inner_keyring.on_decrypt( + decryption_materials=decryption_materials, encrypted_data_keys=encrypted_data_keys + ) + + +@attr.s +class _AwsKmsSingleCmkKeyring(Keyring): + """AWS KMS keyring that only works with a single AWS KMS CMK. + + This keyring should never be used directly. + It should only ever be used internally by :class:`KmsKeyring`. + + .. versionadded:: 1.5.0 + + :param str key_id: CMK key ID + :param ClientSupplier client_supplier: Client supplier to use when asking for clients + :param List[str] grant_tokens: AWS KMS grant tokens to include in requests (optional) + """ + + _key_id = attr.ib(validator=instance_of(six.string_types)) + _client_supplier = attr.ib(validator=is_callable()) + _grant_tokens = attr.ib( + default=attr.Factory(tuple), + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string), + ) + + def on_encrypt(self, encryption_materials): + # type: (EncryptionMaterials) -> EncryptionMaterials + trace_info = MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=self._key_id) + try: + if encryption_materials.data_encryption_key is None: + plaintext_key, encrypted_key = _do_aws_kms_generate_data_key( + client_supplier=self._client_supplier, + key_name=self._key_id, + encryption_context=encryption_materials.encryption_context, + algorithm=encryption_materials.algorithm, + grant_tokens=self._grant_tokens, + ) + encryption_materials.add_data_encryption_key( + data_encryption_key=plaintext_key, + keyring_trace=KeyringTrace(wrapping_key=trace_info, flags=_GENERATE_FLAGS), + ) + else: + encrypted_key = _do_aws_kms_encrypt( + client_supplier=self._client_supplier, + key_name=self._key_id, + plaintext_data_key=encryption_materials.data_encryption_key, + encryption_context=encryption_materials.encryption_context, + grant_tokens=self._grant_tokens, + ) + except Exception: # pylint: disable=broad-except + # We intentionally WANT to catch all exceptions here + message = "Unable to generate or encrypt data key using {}".format(trace_info) + _LOGGER.exception(message) + raise EncryptKeyError(message) + + encryption_materials.add_encrypted_data_key( + encrypted_data_key=encrypted_key, keyring_trace=KeyringTrace(wrapping_key=trace_info, flags=_ENCRYPT_FLAGS) + ) + + return encryption_materials + + def on_decrypt(self, decryption_materials, encrypted_data_keys): + # type: (DecryptionMaterials, Iterable[EncryptedDataKey]) -> DecryptionMaterials + for edk in encrypted_data_keys: + if decryption_materials.data_encryption_key is not None: + return decryption_materials + + if ( + edk.key_provider.provider_id == _PROVIDER_ID + and edk.key_provider.key_info.decode("utf-8") == self._key_id + ): + decryption_materials = _try_aws_kms_decrypt( + client_supplier=self._client_supplier, + decryption_materials=decryption_materials, + grant_tokens=self._grant_tokens, + encrypted_data_key=edk, + ) + + return decryption_materials + + +@attr.s +class _AwsKmsDiscoveryKeyring(Keyring): + """AWS KMS discovery keyring that will attempt to decrypt any AWS KMS encrypted data key. + + This keyring should never be used directly. + It should only ever be used internally by :class:`KmsKeyring`. + + .. versionadded:: 1.5.0 + + :param ClientSupplier client_supplier: Client supplier to use when asking for clients + :param List[str] grant_tokens: AWS KMS grant tokens to include in requests (optional) + """ + + _client_supplier = attr.ib(validator=is_callable()) + _grant_tokens = attr.ib( + default=attr.Factory(tuple), + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string), + ) + + def on_encrypt(self, encryption_materials): + # type: (EncryptionMaterials) -> EncryptionMaterials + return encryption_materials + + def on_decrypt(self, decryption_materials, encrypted_data_keys): + # type: (DecryptionMaterials, Iterable[EncryptedDataKey]) -> DecryptionMaterials + for edk in encrypted_data_keys: + if decryption_materials.data_encryption_key is not None: + return decryption_materials + + if edk.key_provider.provider_id == _PROVIDER_ID: + decryption_materials = _try_aws_kms_decrypt( + client_supplier=self._client_supplier, + decryption_materials=decryption_materials, + grant_tokens=self._grant_tokens, + encrypted_data_key=edk, + ) + + return decryption_materials + + +def _try_aws_kms_decrypt(client_supplier, decryption_materials, grant_tokens, encrypted_data_key): + # type: (ClientSupplierType, DecryptionMaterials, Iterable[str], EncryptedDataKey) -> DecryptionMaterials + """Attempt to call ``kms:Decrypt`` and return the resulting plaintext data key. + + Any errors encountered are caught and logged. + + .. versionadded:: 1.5.0 + + """ + try: + plaintext_key = _do_aws_kms_decrypt( + client_supplier=client_supplier, + key_name=encrypted_data_key.key_provider.key_info.decode("utf-8"), + encrypted_data_key=encrypted_data_key, + encryption_context=decryption_materials.encryption_context, + grant_tokens=grant_tokens, + ) + except Exception: # pylint: disable=broad-except + # We intentionally WANT to catch all exceptions here + _LOGGER.exception("Unable to decrypt encrypted data key from %s", encrypted_data_key.key_provider) + else: + decryption_materials.add_data_encryption_key( + data_encryption_key=plaintext_key, + keyring_trace=KeyringTrace(wrapping_key=encrypted_data_key.key_provider, flags=_DECRYPT_FLAGS), + ) + return decryption_materials + + return decryption_materials + + +def _do_aws_kms_decrypt(client_supplier, key_name, encrypted_data_key, encryption_context, grant_tokens): + # type: (ClientSupplierType, str, EncryptedDataKey, Dict[str, str], Iterable[str]) -> RawDataKey + """Attempt to call ``kms:Decrypt`` and return the resulting plaintext data key. + + Any errors encountered are passed up the chain without comment. + + .. versionadded:: 1.5.0 + + """ + region = _region_from_key_id(encrypted_data_key.key_provider.key_info.decode("utf-8")) + client = client_supplier(region) + response = client.decrypt( + CiphertextBlob=encrypted_data_key.encrypted_data_key, + EncryptionContext=encryption_context, + GrantTokens=grant_tokens, + ) + response_key_id = response["KeyId"] + if response_key_id != key_name: + raise DecryptKeyError( + "Decryption results from AWS KMS are for an unexpected key ID!" + " actual '{actual}' != expected '{expected}'".format(actual=response_key_id, expected=key_name) + ) + return RawDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response_key_id), data_key=response["Plaintext"] + ) + + +def _do_aws_kms_encrypt(client_supplier, key_name, plaintext_data_key, encryption_context, grant_tokens): + # type: (ClientSupplierType, str, RawDataKey, Dict[str, str], Iterable[str]) -> EncryptedDataKey + """Attempt to call ``kms:Encrypt`` and return the resulting encrypted data key. + + Any errors encountered are passed up the chain without comment. + """ + region = _region_from_key_id(key_name) + client = client_supplier(region) + response = client.encrypt( + KeyId=key_name, + Plaintext=plaintext_data_key.data_key, + EncryptionContext=encryption_context, + GrantTokens=grant_tokens, + ) + return EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response["KeyId"]), + encrypted_data_key=response["CiphertextBlob"], + ) + + +def _do_aws_kms_generate_data_key(client_supplier, key_name, encryption_context, algorithm, grant_tokens): + # type: (ClientSupplierType, str, Dict[str, str], AlgorithmSuite, Iterable[str]) -> (RawDataKey, EncryptedDataKey) + """Attempt to call ``kms:GenerateDataKey`` and return the resulting plaintext and encrypted data keys. + + Any errors encountered are passed up the chain without comment. + + .. versionadded:: 1.5.0 + + """ + region = _region_from_key_id(key_name) + client = client_supplier(region) + response = client.generate_data_key( + KeyId=key_name, + NumberOfBytes=algorithm.kdf_input_len, + EncryptionContext=encryption_context, + GrantTokens=grant_tokens, + ) + provider = MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response["KeyId"]) + plaintext_key = RawDataKey(key_provider=provider, data_key=response["Plaintext"]) + encrypted_key = EncryptedDataKey(key_provider=provider, encrypted_data_key=response["CiphertextBlob"]) + return plaintext_key, encrypted_key + + +def _region_from_key_id(key_id): + # type: (str) -> Union[None, str] + """Attempt to determine the region from the key ID. + + If the region cannot be found, ``None`` is returned instead. + + .. versionadded:: 1.5.0 + + """ + parts = key_id.split(":", 4) + try: + return parts[3] + except IndexError: + return None diff --git a/src/aws_encryption_sdk/keyrings/aws_kms/_client_cache.py b/src/aws_encryption_sdk/keyrings/aws_kms/_client_cache.py new file mode 100644 index 000000000..9eb8d3e82 --- /dev/null +++ b/src/aws_encryption_sdk/keyrings/aws_kms/_client_cache.py @@ -0,0 +1,119 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""boto3 client cache for use by client suppliers. + +.. versionadded:: 1.5.0 + +.. warning:: + No guarantee is provided on the modules and APIs within this + namespace staying consistent. Directly reference at your own risk. + +""" +import functools +import logging + +import attr +from attr.validators import instance_of +from boto3.session import Session as Boto3Session +from botocore.client import BaseClient +from botocore.config import Config as BotocoreConfig +from botocore.exceptions import BotoCoreError +from botocore.session import Session as BotocoreSession + +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Dict # noqa pylint: disable=unused-import +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + +_LOGGER = logging.getLogger(__name__) +__all__ = ("ClientCache",) + + +@attr.s +class ClientCache(object): + """Provide boto3 clients regional clients, caching by region. + + Any clients that throw an error when used are immediately removed from the cache. + + .. versionadded:: 1.5.0 + + :param botocore_session: Botocore session to use when creating clients + :type botocore_session: botocore.session.Session + :param client_config: Config to use when creating client + :type client_config: botocore.config.Config + """ + + _botocore_session = attr.ib(validator=instance_of(BotocoreSession)) + _client_config = attr.ib(validator=instance_of(BotocoreConfig)) + + def __attrs_post_init__(self): + """Set up internal cache.""" + self._cache = {} # type: Dict[str, BaseClient] + + def _wrap_client_method(self, region_name, method, *args, **kwargs): + """Proxy a call to a boto3 client method and remove any misbehaving clients from the cache. + + :param str region_name: Client region name + :param Callable method: Method on the boto3 client to proxy + :param Tuple args: Positional arguments to pass to ``method`` + :param Dict kwargs: Named arguments to pass to ``method`` + :returns: result of + """ + try: + return method(*args, **kwargs) + except BotoCoreError as error: + try: + del self._cache[region_name] + except KeyError: + pass + _LOGGER.exception( + 'Removing client "%s" from cache due to BotoCoreError on %s call', region_name, method.__name__ + ) + raise error + + def _patch_client(self, client): + # type: (BaseClient) -> BaseClient + """Patch a boto3 client, wrapping every API call in ``_wrap_client_method``. + + :param BaseClient client: boto3 client to patch + :returns: patched client + """ + for method_name in client.meta.method_to_api_mapping: + method = getattr(client, method_name) + wrapped_method = functools.partial(self._wrap_client_method, client.meta.region_name, method) + setattr(client, method_name, wrapped_method) + + return client + + def _add_client(self, region_name, service): + # type: (str, str) -> BaseClient + """Make a new client and add it to the internal cache. + + :param str region_name: Client region + :param str service: Client service + :returns: New client, now in cache + :rtype: botocore.client.BaseClient + """ + client = Boto3Session(botocore_session=self._botocore_session).client( + service_name=service, region_name=region_name, config=self._client_config + ) + patched_client = self._patch_client(client) + self._cache[region_name] = patched_client + return client + + def client(self, region_name, service): + # type: (str, str) -> BaseClient + """Get a client for the specified region and service. + + Generate a new client if needed. + Otherwise, retrieve an existing client from the internal cache. + + :param str region_name: Client region + :param str service: Client service + :rtype: botocore.client.BaseClient + """ + try: + return self._cache[region_name] + except KeyError: + return self._add_client(region_name, service) diff --git a/src/aws_encryption_sdk/keyrings/aws_kms/client_suppliers.py b/src/aws_encryption_sdk/keyrings/aws_kms/client_suppliers.py new file mode 100644 index 000000000..c8a0af696 --- /dev/null +++ b/src/aws_encryption_sdk/keyrings/aws_kms/client_suppliers.py @@ -0,0 +1,159 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""AWS KMS client suppliers for use with AWS KMS keyring. + +.. versionadded:: 1.5.0 + +""" +import functools +import logging + +import attr +import six +from attr.validators import deep_iterable, instance_of, is_callable, optional +from botocore.client import BaseClient +from botocore.config import Config as BotocoreConfig +from botocore.session import Session as BotocoreSession + +from aws_encryption_sdk.exceptions import UnknownRegionError +from aws_encryption_sdk.identifiers import USER_AGENT_SUFFIX +from aws_encryption_sdk.internal.validators import value_is_not_a_string + +from ._client_cache import ClientCache + +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Callable, Union # noqa pylint: disable=unused-import + + ClientSupplierType = Callable[[Union[None, str]], BaseClient] +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + +_LOGGER = logging.getLogger(__name__) +__all__ = ( + "ClientSupplier", + "ClientSupplierType", + "DefaultClientSupplier", + "AllowRegionsClientSupplier", + "DenyRegionsClientSupplier", +) + + +class ClientSupplier(object): + """Base class for client suppliers. + + .. versionadded:: 1.5.0 + + """ + + def __call__(self, region_name): + # type: (Union[None, str]) -> BaseClient + """Return a client for the requested region. + + :rtype: BaseClient + """ + raise NotImplementedError("'ClientSupplier' is not callable") + + +@attr.s +class DefaultClientSupplier(ClientSupplier): + """The default AWS KMS client supplier. + Creates and caches clients for any region. + + .. versionadded:: 1.5.0 + + If you want clients to have special credentials or other configuration, + you can provide those with custom ``botocore`` Session and/or `Config`_ instances. + + .. _Config: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + + .. code-block:: python + + from aws_encryption_sdk.keyrings.aws_kms.client_supplier import DefaultClientSupplier + from botocore.session import Session + from botocore.config import Config + + my_client_supplier = DefaultClientSupplier( + botocore_session=Session(**_get_custom_credentials()), + client_config=Config(connect_timeout=10), + ) + + :param botocore_session: Botocore session to use when creating clients (optional) + :type botocore_session: botocore.session.Session + :param client_config: Config to use when creating client (optional) + :type client_config: botocore.config.Config + """ + + _botocore_session = attr.ib(default=attr.Factory(BotocoreSession), validator=instance_of(BotocoreSession)) + _client_config = attr.ib( + default=attr.Factory(functools.partial(BotocoreConfig, user_agent_extra=USER_AGENT_SUFFIX)), + validator=instance_of(BotocoreConfig), + ) + + def __attrs_post_init__(self): + """Set up the internal cache.""" + self._client_cache = ClientCache(botocore_session=self._botocore_session, client_config=self._client_config) + + def __call__(self, region_name): + # type: (Union[None, str]) -> BaseClient + """Return a client for the requested region. + + :rtype: BaseClient + """ + return self._client_cache.client(region_name=region_name, service="kms") + + +@attr.s +class AllowRegionsClientSupplier(ClientSupplier): + """AWS KMS client supplier that only supplies clients for the specified regions. + + .. versionadded:: 1.5.0 + + :param List[str] allowed_regions: Regions to allow + :param ClientSupplier client_supplier: Client supplier to wrap (optional) + """ + + allowed_regions = attr.ib( + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string) + ) + _client_supplier = attr.ib(default=attr.Factory(DefaultClientSupplier), validator=optional(is_callable())) + + def __call__(self, region_name): + # type: (Union[None, str]) -> BaseClient + """Return a client for the requested region. + + :rtype: BaseClient + :raises UnknownRegionError: if a region is requested that is not in ``allowed_regions`` + """ + if region_name not in self.allowed_regions: + raise UnknownRegionError("Unable to provide client for region '{}'".format(region_name)) + + return self._client_supplier(region_name) + + +@attr.s +class DenyRegionsClientSupplier(ClientSupplier): + """AWS KMS client supplier that supplies clients for any region except for the specified regions. + + .. versionadded:: 1.5.0 + + :param List[str] denied_regions: Regions to deny + :param ClientSupplier client_supplier: Client supplier to wrap (optional) + """ + + denied_regions = attr.ib( + validator=(deep_iterable(member_validator=instance_of(six.string_types)), value_is_not_a_string) + ) + _client_supplier = attr.ib(default=attr.Factory(DefaultClientSupplier), validator=optional(is_callable())) + + def __call__(self, region_name): + # type: (Union[None, str]) -> BaseClient + """Return a client for the requested region. + + :rtype: BaseClient + :raises UnknownRegionError: if a region is requested that is in ``denied_regions`` + """ + if region_name in self.denied_regions: + raise UnknownRegionError("Unable to provide client for region '{}'".format(region_name)) + + return self._client_supplier(region_name) diff --git a/test/functional/functional_test_utils.py b/test/functional/functional_test_utils.py new file mode 100644 index 000000000..3822ef5fa --- /dev/null +++ b/test/functional/functional_test_utils.py @@ -0,0 +1,29 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Utility functions to handle configuration and credentials setup for functional tests.""" +import boto3 +import pytest +from moto.kms import mock_kms + +FAKE_REGION = "us-west-2" + + +def _create_cmk(): + # type: () -> str + kms = boto3.client("kms", region_name=FAKE_REGION) + response = kms.create_key() + return response["KeyMetadata"]["Arn"] + + +@pytest.fixture +def fake_generator(): + with mock_kms(): + yield _create_cmk() + + +@pytest.fixture +def fake_generator_and_child(): + with mock_kms(): + generator = _create_cmk() + child = _create_cmk() + yield generator, child diff --git a/test/functional/keyrings/aws_kms/__init__.py b/test/functional/keyrings/aws_kms/__init__.py new file mode 100644 index 000000000..d548f9b1f --- /dev/null +++ b/test/functional/keyrings/aws_kms/__init__.py @@ -0,0 +1,3 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Dummy stub to make linters work better.""" diff --git a/test/functional/keyrings/aws_kms/test_aws_kms.py b/test/functional/keyrings/aws_kms/test_aws_kms.py new file mode 100644 index 000000000..19d0d68ad --- /dev/null +++ b/test/functional/keyrings/aws_kms/test_aws_kms.py @@ -0,0 +1,490 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Functional tests for ``aws_encryption_sdk.keyrings.aws_kms``.""" +import itertools +import logging +import os + +import boto3 +import pytest +from moto.kms import mock_kms + +from aws_encryption_sdk.exceptions import DecryptKeyError, EncryptKeyError +from aws_encryption_sdk.identifiers import KeyringTraceFlag +from aws_encryption_sdk.internal.defaults import ALGORITHM +from aws_encryption_sdk.keyrings.aws_kms import ( + _PROVIDER_ID, + KmsKeyring, + _AwsKmsDiscoveryKeyring, + _AwsKmsSingleCmkKeyring, + _do_aws_kms_decrypt, + _do_aws_kms_encrypt, + _do_aws_kms_generate_data_key, + _try_aws_kms_decrypt, +) +from aws_encryption_sdk.keyrings.aws_kms.client_suppliers import DefaultClientSupplier +from aws_encryption_sdk.materials_managers import DecryptionMaterials, EncryptionMaterials +from aws_encryption_sdk.structures import EncryptedDataKey, KeyringTrace, MasterKeyInfo, RawDataKey + +# used as fixtures +from ...functional_test_utils import fake_generator # noqa pylint: disable=unused-import +from ...functional_test_utils import fake_generator_and_child # noqa pylint: disable=unused-import +from ...functional_test_utils import FAKE_REGION + +try: # Python 3.5.0 and 3.5.1 have incompatible typing modules + from typing import Iterable, List # noqa pylint: disable=unused-import +except ImportError: # pragma: no cover + # We only actually need these imports when running the mypy checks + pass + +pytestmark = [pytest.mark.functional, pytest.mark.local] + + +def _matching_flags(wrapping_key, keyring_trace): + # type: (MasterKeyInfo, Iterable[KeyringTrace]) -> List[KeyringTraceFlag] + return list( + itertools.chain.from_iterable([entry.flags for entry in keyring_trace if entry.wrapping_key == wrapping_key]) + ) + + +def test_aws_kms_single_cmk_keyring_on_encrypt_empty_materials(fake_generator): + keyring = _AwsKmsSingleCmkKeyring(key_id=fake_generator, client_supplier=DefaultClientSupplier()) + + initial_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + result_materials = keyring.on_encrypt(initial_materials) + + assert result_materials.data_encryption_key is not None + assert len(result_materials.encrypted_data_keys) == 1 + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=fake_generator), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_GENERATED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_ENCRYPTED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_SIGNED_ENC_CTX in generator_flags + + +def test_aws_kms_single_cmk_keyring_on_encrypt_existing_data_key(fake_generator): + keyring = _AwsKmsSingleCmkKeyring(key_id=fake_generator, client_supplier=DefaultClientSupplier()) + + initial_materials = EncryptionMaterials( + algorithm=ALGORITHM, + encryption_context={}, + data_encryption_key=RawDataKey( + key_provider=MasterKeyInfo(provider_id="foo", key_info=b"bar"), data_key=os.urandom(ALGORITHM.kdf_input_len) + ), + ) + + result_materials = keyring.on_encrypt(initial_materials) + + assert result_materials.data_encryption_key is not None + assert len(result_materials.encrypted_data_keys) == 1 + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=fake_generator), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_GENERATED_DATA_KEY not in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_ENCRYPTED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_SIGNED_ENC_CTX in generator_flags + + +@mock_kms +def test_aws_kms_single_cmk_keyring_on_encrypt_fail(): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + keyring = _AwsKmsSingleCmkKeyring(key_id="foo", client_supplier=DefaultClientSupplier()) + + initial_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + with pytest.raises(EncryptKeyError) as excinfo: + keyring.on_encrypt(initial_materials) + + excinfo.match(r"Unable to generate or encrypt data key using *") + + +@mock_kms +def test_aws_kms_single_cmk_keyring_on_decrypt_existing_datakey(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + keyring = _AwsKmsSingleCmkKeyring(key_id="foo", client_supplier=DefaultClientSupplier()) + + initial_materials = DecryptionMaterials( + algorithm=ALGORITHM, + encryption_context={}, + data_encryption_key=RawDataKey( + key_provider=MasterKeyInfo(provider_id="foo", key_info=b"bar"), data_key=os.urandom(ALGORITHM.kdf_input_len) + ), + ) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_materials, + encrypted_data_keys=( + EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=b"foo"), encrypted_data_key=b"bar" + ), + ), + ) + + assert result_materials.data_encryption_key == initial_materials.data_encryption_key + + log_data = caplog.text + # This means that it did NOT try to decrypt the EDK. + assert "Unable to decrypt encrypted data key from" not in log_data + + +def test_aws_kms_single_cmk_keyring_on_decrypt_single_cmk(fake_generator): + keyring = _AwsKmsSingleCmkKeyring(key_id=fake_generator, client_supplier=DefaultClientSupplier()) + + initial_encryption_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + encryption_materials = keyring.on_encrypt(initial_encryption_materials) + + initial_decryption_materials = DecryptionMaterials( + algorithm=encryption_materials.algorithm, encryption_context=encryption_materials.encryption_context + ) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_decryption_materials, encrypted_data_keys=encryption_materials.encrypted_data_keys + ) + + assert result_materials.data_encryption_key is not None + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=fake_generator), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_DECRYPTED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_VERIFIED_ENC_CTX in generator_flags + + +def test_aws_kms_single_cmk_keyring_on_decrypt_multiple_cmk(fake_generator_and_child): + generator, child = fake_generator_and_child + + encrypting_keyring = KmsKeyring(generator_key_id=generator, child_key_ids=(child,)) + decrypting_keyring = _AwsKmsSingleCmkKeyring(key_id=child, client_supplier=DefaultClientSupplier()) + + initial_encryption_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + encryption_materials = encrypting_keyring.on_encrypt(initial_encryption_materials) + + initial_decryption_materials = DecryptionMaterials( + algorithm=encryption_materials.algorithm, encryption_context=encryption_materials.encryption_context + ) + + result_materials = decrypting_keyring.on_decrypt( + decryption_materials=initial_decryption_materials, encrypted_data_keys=encryption_materials.encrypted_data_keys + ) + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=generator), result_materials.keyring_trace + ) + assert len(generator_flags) == 0 + + child_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=child), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_DECRYPTED_DATA_KEY in child_flags + assert KeyringTraceFlag.WRAPPING_KEY_VERIFIED_ENC_CTX in child_flags + + +def test_aws_kms_single_cmk_keyring_on_decrypt_no_match(fake_generator_and_child): + generator, child = fake_generator_and_child + + encrypting_keyring = _AwsKmsSingleCmkKeyring(key_id=generator, client_supplier=DefaultClientSupplier()) + decrypting_keyring = _AwsKmsSingleCmkKeyring(key_id=child, client_supplier=DefaultClientSupplier()) + + initial_encryption_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + encryption_materials = encrypting_keyring.on_encrypt(initial_encryption_materials) + + initial_decryption_materials = DecryptionMaterials( + algorithm=encryption_materials.algorithm, encryption_context=encryption_materials.encryption_context + ) + + result_materials = decrypting_keyring.on_decrypt( + decryption_materials=initial_decryption_materials, encrypted_data_keys=encryption_materials.encrypted_data_keys + ) + + assert result_materials.data_encryption_key is None + + +@mock_kms +def test_aws_kms_single_cmk_keyring_on_decrypt_fail(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + keyring = _AwsKmsSingleCmkKeyring(key_id="foo", client_supplier=DefaultClientSupplier()) + + initial_materials = DecryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_materials, + encrypted_data_keys=( + EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=b"foo"), encrypted_data_key=b"bar" + ), + ), + ) + + assert not result_materials.data_encryption_key + + log_data = caplog.text + + # This means that it did actually try to decrypt the EDK but encountered an error talking to KMS. + assert "Unable to decrypt encrypted data key from" in log_data + + +def test_aws_kms_discovery_keyring_on_encrypt(): + keyring = _AwsKmsDiscoveryKeyring(client_supplier=DefaultClientSupplier()) + + initial_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + result_materials = keyring.on_encrypt(initial_materials) + + assert len(result_materials.encrypted_data_keys) == 0 + + +@pytest.fixture +def encryption_materials_for_discovery_decrypt(fake_generator): + encrypting_keyring = _AwsKmsSingleCmkKeyring(key_id=fake_generator, client_supplier=DefaultClientSupplier()) + + initial_encryption_materials = EncryptionMaterials(algorithm=ALGORITHM, encryption_context={}) + + return fake_generator, encrypting_keyring.on_encrypt(initial_encryption_materials) + + +def test_aws_kms_discovery_keyring_on_decrypt(encryption_materials_for_discovery_decrypt): + generator_key_id, encryption_materials = encryption_materials_for_discovery_decrypt + + decrypting_keyring = _AwsKmsDiscoveryKeyring(client_supplier=DefaultClientSupplier()) + + initial_decryption_materials = DecryptionMaterials( + algorithm=encryption_materials.algorithm, encryption_context=encryption_materials.encryption_context + ) + + result_materials = decrypting_keyring.on_decrypt( + decryption_materials=initial_decryption_materials, encrypted_data_keys=encryption_materials.encrypted_data_keys + ) + + assert result_materials.data_encryption_key is not None + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=generator_key_id), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_DECRYPTED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_VERIFIED_ENC_CTX in generator_flags + + +@mock_kms +def test_aws_kms_discovery_keyring_on_decrypt_existing_data_key(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + keyring = _AwsKmsDiscoveryKeyring(client_supplier=DefaultClientSupplier()) + + initial_materials = DecryptionMaterials( + algorithm=ALGORITHM, + encryption_context={}, + data_encryption_key=RawDataKey( + key_provider=MasterKeyInfo(provider_id="foo", key_info=b"bar"), data_key=os.urandom(ALGORITHM.kdf_input_len) + ), + ) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_materials, + encrypted_data_keys=( + EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=b"foo"), encrypted_data_key=b"bar" + ), + ), + ) + + assert result_materials.data_encryption_key == initial_materials.data_encryption_key + + log_data = caplog.text + # This means that it did NOT try to decrypt the EDK. + assert "Unable to decrypt encrypted data key from" not in log_data + + +@mock_kms +def test_aws_kms_discovery_keyring_on_decrypt_no_matching_edk(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + keyring = _AwsKmsDiscoveryKeyring(client_supplier=DefaultClientSupplier()) + + initial_materials = DecryptionMaterials(algorithm=ALGORITHM, encryption_context={},) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_materials, + encrypted_data_keys=( + EncryptedDataKey(key_provider=MasterKeyInfo(provider_id="foo", key_info=b"bar"), encrypted_data_key=b"bar"), + ), + ) + + assert result_materials.data_encryption_key is None + + log_data = caplog.text + # This means that it did NOT try to decrypt the EDK. + assert "Unable to decrypt encrypted data key from" not in log_data + + +@mock_kms +def test_aws_kms_discovery_keyring_on_decrypt_fail(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + keyring = _AwsKmsDiscoveryKeyring(client_supplier=DefaultClientSupplier()) + + initial_materials = DecryptionMaterials(algorithm=ALGORITHM, encryption_context={},) + + result_materials = keyring.on_decrypt( + decryption_materials=initial_materials, + encrypted_data_keys=( + EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=b"bar"), encrypted_data_key=b"bar" + ), + ), + ) + + assert result_materials.data_encryption_key is None + + log_data = caplog.text + # This means that it did actually try to decrypt the EDK but encountered an error talking to KMS. + assert "Unable to decrypt encrypted data key from" in log_data + + +def test_try_aws_kms_decrypt_succeed(fake_generator): + encryption_context = {"foo": "bar"} + kms = boto3.client("kms", region_name=FAKE_REGION) + plaintext = b"0123" * 8 + response = kms.encrypt(KeyId=fake_generator, Plaintext=plaintext, EncryptionContext=encryption_context) + + encrypted_data_key = EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response["KeyId"]), + encrypted_data_key=response["CiphertextBlob"], + ) + + initial_decryption_materials = DecryptionMaterials(algorithm=ALGORITHM, encryption_context=encryption_context,) + + result_materials = _try_aws_kms_decrypt( + client_supplier=DefaultClientSupplier(), + decryption_materials=initial_decryption_materials, + grant_tokens=[], + encrypted_data_key=encrypted_data_key, + ) + + assert result_materials.data_encryption_key.data_key == plaintext + + generator_flags = _matching_flags( + MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=fake_generator), result_materials.keyring_trace + ) + + assert KeyringTraceFlag.WRAPPING_KEY_DECRYPTED_DATA_KEY in generator_flags + assert KeyringTraceFlag.WRAPPING_KEY_VERIFIED_ENC_CTX in generator_flags + + +@mock_kms +def test_try_aws_kms_decrypt_error(caplog): + # In this context there are no KMS CMKs, so any calls to KMS will fail. + caplog.set_level(logging.DEBUG) + + encrypted_data_key = EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=b"foo"), encrypted_data_key=b"bar" + ) + + initial_decryption_materials = DecryptionMaterials(algorithm=ALGORITHM, encryption_context={},) + + result_materials = _try_aws_kms_decrypt( + client_supplier=DefaultClientSupplier(), + decryption_materials=initial_decryption_materials, + grant_tokens=[], + encrypted_data_key=encrypted_data_key, + ) + + assert result_materials.data_encryption_key is None + + log_data = caplog.text + # This means that it did actually try to decrypt the EDK but encountered an error talking to KMS. + assert "Unable to decrypt encrypted data key from" in log_data + + +def test_do_aws_kms_decrypt(fake_generator): + encryption_context = {"foo": "bar"} + kms = boto3.client("kms", region_name=FAKE_REGION) + plaintext = b"0123" * 8 + response = kms.encrypt(KeyId=fake_generator, Plaintext=plaintext, EncryptionContext=encryption_context) + + encrypted_data_key = EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response["KeyId"]), + encrypted_data_key=response["CiphertextBlob"], + ) + + decrypted_data_key = _do_aws_kms_decrypt( + client_supplier=DefaultClientSupplier(), + key_name=fake_generator, + encrypted_data_key=encrypted_data_key, + encryption_context=encryption_context, + grant_tokens=[], + ) + assert decrypted_data_key.data_key == plaintext + + +def test_do_aws_kms_decrypt_unexpected_key_id(fake_generator_and_child): + encryptor, decryptor = fake_generator_and_child + encryption_context = {"foo": "bar"} + kms = boto3.client("kms", region_name=FAKE_REGION) + plaintext = b"0123" * 8 + response = kms.encrypt(KeyId=encryptor, Plaintext=plaintext, EncryptionContext=encryption_context) + + encrypted_data_key = EncryptedDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=response["KeyId"]), + encrypted_data_key=response["CiphertextBlob"], + ) + + with pytest.raises(DecryptKeyError) as excinfo: + _do_aws_kms_decrypt( + client_supplier=DefaultClientSupplier(), + key_name=decryptor, + encrypted_data_key=encrypted_data_key, + encryption_context=encryption_context, + grant_tokens=[], + ) + + excinfo.match(r"Decryption results from AWS KMS are for an unexpected key ID*") + + +def test_do_aws_kms_encrypt(fake_generator): + encryption_context = {"foo": "bar"} + plaintext = b"0123" * 8 + + encrypted_key = _do_aws_kms_encrypt( + client_supplier=DefaultClientSupplier(), + key_name=fake_generator, + plaintext_data_key=RawDataKey( + key_provider=MasterKeyInfo(provider_id=_PROVIDER_ID, key_info=fake_generator), data_key=plaintext + ), + encryption_context=encryption_context, + grant_tokens=[], + ) + + kms = boto3.client("kms", region_name=FAKE_REGION) + response = kms.decrypt(CiphertextBlob=encrypted_key.encrypted_data_key, EncryptionContext=encryption_context) + + assert response["Plaintext"] == plaintext + + +def test_do_aws_kms_generate_data_key(fake_generator): + encryption_context = {"foo": "bar"} + plaintext_key, encrypted_key = _do_aws_kms_generate_data_key( + client_supplier=DefaultClientSupplier(), + key_name=fake_generator, + encryption_context=encryption_context, + algorithm=ALGORITHM, + grant_tokens=[], + ) + + kms = boto3.client("kms", region_name=FAKE_REGION) + response = kms.decrypt(CiphertextBlob=encrypted_key.encrypted_data_key, EncryptionContext=encryption_context) + + assert response["Plaintext"] == plaintext_key.data_key diff --git a/test/functional/keyrings/aws_kms/test_client_cache.py b/test/functional/keyrings/aws_kms/test_client_cache.py new file mode 100644 index 000000000..06c6c51c0 --- /dev/null +++ b/test/functional/keyrings/aws_kms/test_client_cache.py @@ -0,0 +1,33 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Functional tests for ``aws_encryption_sdk.keyrings.aws_kms.client_cache``.""" +import pytest +from botocore.config import Config +from botocore.session import Session + +from aws_encryption_sdk.keyrings.aws_kms._client_cache import ClientCache + +pytestmark = [pytest.mark.functional, pytest.mark.local] + + +def test_client_cache_caches_clients(): + cache = ClientCache(botocore_session=Session(), client_config=Config()) + + initial_client = cache.client("us-west-2", "kms") + + test = cache.client("us-west-2", "kms") + + assert "us-west-2" in cache._cache + assert test is initial_client + + +def test_client_cache_new_client(): + cache = ClientCache(botocore_session=Session(), client_config=Config()) + + initial_client = cache.client("us-west-2", "kms") + + cache._cache.pop("us-west-2") + + test = cache.client("us-west-2", "kms") + + assert test is not initial_client diff --git a/test/functional/keyrings/aws_kms/test_client_suppliers.py b/test/functional/keyrings/aws_kms/test_client_suppliers.py new file mode 100644 index 000000000..2d63fcc02 --- /dev/null +++ b/test/functional/keyrings/aws_kms/test_client_suppliers.py @@ -0,0 +1,116 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Functional tests for ``aws_encryption_sdk.keyrings.aws_kms.client_suppliers``.""" +import pytest +from botocore.config import Config +from botocore.session import Session + +from aws_encryption_sdk.exceptions import UnknownRegionError +from aws_encryption_sdk.keyrings.aws_kms.client_suppliers import ( + AllowRegionsClientSupplier, + ClientSupplier, + DefaultClientSupplier, + DenyRegionsClientSupplier, +) + +pytestmark = [pytest.mark.functional, pytest.mark.local] + + +def test_default_supplier_not_implemented(): + test = ClientSupplier() + + with pytest.raises(NotImplementedError) as excinfo: + test("region") + + excinfo.match("'ClientSupplier' is not callable") + + +def test_default_supplier_uses_cache(): + supplier = DefaultClientSupplier() + + region = "us-west-2" + expected = supplier._client_cache.client(region_name=region, service="kms") + + test = supplier(region) + + assert test is expected + + +def test_default_supplier_passes_through_configs(): + session = Session() + config = Config() + + test = DefaultClientSupplier(botocore_session=session, client_config=config) + + assert test._client_cache._botocore_session is session + assert test._client_cache._client_config is config + + +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(allowed_regions="foo"), id="allowed_regions is a string"), + pytest.param(dict(allowed_regions=["foo", 5]), id="allowed_regions contains invalid type"), + ), +) +def test_allow_regions_supplier_invalid_parameters(kwargs): + with pytest.raises(TypeError): + AllowRegionsClientSupplier(**kwargs) + + +def test_allow_regions_supplier_allows_allowed_region(): + test = AllowRegionsClientSupplier(allowed_regions=["us-west-2", "us-east-2"]) + + assert test("us-west-2") + + +def test_allow_regions_supplier_denied_not_allowed_region(): + test = AllowRegionsClientSupplier(allowed_regions=["us-west-2", "us-east-2"]) + + with pytest.raises(UnknownRegionError) as excinfo: + test("ap-northeast-2") + + excinfo.match("Unable to provide client for region 'ap-northeast-2'") + + +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(denied_regions="foo"), id="denied_regions is a string"), + pytest.param(dict(denied_regions=["foo", 5]), id="denied_regions contains invalid type"), + ), +) +def test_deny_regions_supplier_invalid_parameters(kwargs): + with pytest.raises(TypeError): + DenyRegionsClientSupplier(**kwargs) + + +def test_deny_regions_supplier_denies_denied_region(): + test = DenyRegionsClientSupplier(denied_regions=["us-west-2", "us-east-2"]) + + with pytest.raises(UnknownRegionError) as excinfo: + test("us-west-2") + + excinfo.match("Unable to provide client for region 'us-west-2'") + + +def test_deny_regions_supplier_allows_not_denied_region(): + test = DenyRegionsClientSupplier(denied_regions=["us-west-2", "us-east-2"]) + + assert test("ap-northeast-2") + + +def test_allow_deny_nested_supplier(): + test_allow = AllowRegionsClientSupplier( + allowed_regions=["us-west-2", "us-east-2"], client_supplier=DefaultClientSupplier() + ) + test_deny = DenyRegionsClientSupplier(denied_regions=["us-west-2"], client_supplier=test_allow) + + # test_allow allows us-west-2 + test_allow("us-west-2") + + # test_deny denies us-west-2 even though its internal supplier (test_allow) allows it + with pytest.raises(UnknownRegionError) as excinfo: + test_deny("us-west-2") + + excinfo.match("Unable to provide client for region 'us-west-2'") diff --git a/test/integration/integration_test_utils.py b/test/integration/integration_test_utils.py index b65d93570..37e0243b7 100644 --- a/test/integration/integration_test_utils.py +++ b/test/integration/integration_test_utils.py @@ -14,12 +14,15 @@ import os import botocore.session +import pytest from aws_encryption_sdk.key_providers.kms import KMSMasterKeyProvider +from aws_encryption_sdk.keyrings.aws_kms import KmsKeyring AWS_KMS_KEY_ID = "AWS_ENCRYPTION_SDK_PYTHON_INTEGRATION_TEST_AWS_KMS_KEY_ID" _KMS_MKP = None _KMS_MKP_BOTO = None +_KMS_KEYRING = None def get_cmk_arn(): @@ -37,7 +40,7 @@ def get_cmk_arn(): def setup_kms_master_key_provider(cache=True): - """Reads the test_values config file and builds the requested KMS Master Key Provider.""" + """Build an AWS KMS Master Key Provider.""" global _KMS_MKP # pylint: disable=global-statement if cache and _KMS_MKP is not None: return _KMS_MKP @@ -53,7 +56,7 @@ def setup_kms_master_key_provider(cache=True): def setup_kms_master_key_provider_with_botocore_session(cache=True): - """Reads the test_values config file and builds the requested KMS Master Key Provider with botocore_session.""" + """Build an AWS KMS Master Key Provider with an explicit botocore_session.""" global _KMS_MKP_BOTO # pylint: disable=global-statement if cache and _KMS_MKP_BOTO is not None: return _KMS_MKP_BOTO @@ -66,3 +69,29 @@ def setup_kms_master_key_provider_with_botocore_session(cache=True): _KMS_MKP_BOTO = kms_master_key_provider return kms_master_key_provider + + +def build_aws_kms_keyring(generate=True, cache=True): + """Build an AWS KMS keyring.""" + global _KMS_KEYRING # pylint: disable=global-statement + if cache and _KMS_KEYRING is not None: + return _KMS_KEYRING + + cmk_arn = get_cmk_arn() + + if generate: + kwargs = dict(generator_key_id=cmk_arn) + else: + kwargs = dict(child_key_ids=[cmk_arn]) + + keyring = KmsKeyring(**kwargs) + + if cache: + _KMS_KEYRING = keyring + + return keyring + + +@pytest.fixture +def aws_kms_keyring(): + return build_aws_kms_keyring() diff --git a/test/integration/key_providers/__init__.py b/test/integration/key_providers/__init__.py new file mode 100644 index 000000000..d548f9b1f --- /dev/null +++ b/test/integration/key_providers/__init__.py @@ -0,0 +1,3 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Dummy stub to make linters work better.""" diff --git a/test/integration/key_providers/test_kms.py b/test/integration/key_providers/test_kms.py new file mode 100644 index 000000000..59c699f70 --- /dev/null +++ b/test/integration/key_providers/test_kms.py @@ -0,0 +1,31 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for ``aws_encryption_sdk.key_provider.kms``.""" +from test.integration.integration_test_utils import setup_kms_master_key_provider_with_botocore_session + +import pytest +from botocore.exceptions import BotoCoreError + +from aws_encryption_sdk.key_providers.kms import KMSMasterKeyProvider + +pytestmark = [pytest.mark.integ] + + +def test_remove_bad_client(): + test = KMSMasterKeyProvider() + fake_region = "us-fakey-12" + test.add_regional_client(fake_region) + + with pytest.raises(BotoCoreError): + test._regional_clients[fake_region].list_keys() + + assert fake_region not in test._regional_clients + + +def test_regional_client_does_not_modify_botocore_session(caplog): + mkp = setup_kms_master_key_provider_with_botocore_session() + fake_region = "us-fakey-12" + + assert mkp.config.botocore_session.get_config_variable("region") != fake_region + mkp.add_regional_client(fake_region) + assert mkp.config.botocore_session.get_config_variable("region") != fake_region diff --git a/test/integration/keyrings/__init__.py b/test/integration/keyrings/__init__.py new file mode 100644 index 000000000..d548f9b1f --- /dev/null +++ b/test/integration/keyrings/__init__.py @@ -0,0 +1,3 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Dummy stub to make linters work better.""" diff --git a/test/integration/keyrings/aws_kms/__init__.py b/test/integration/keyrings/aws_kms/__init__.py new file mode 100644 index 000000000..d548f9b1f --- /dev/null +++ b/test/integration/keyrings/aws_kms/__init__.py @@ -0,0 +1,3 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Dummy stub to make linters work better.""" diff --git a/test/integration/keyrings/aws_kms/test_client_cache.py b/test/integration/keyrings/aws_kms/test_client_cache.py new file mode 100644 index 000000000..6ab1a05d5 --- /dev/null +++ b/test/integration/keyrings/aws_kms/test_client_cache.py @@ -0,0 +1,49 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Integration tests for ``aws_encryption_sdk.keyrings.aws_kms.client_cache``.""" +import pytest +from botocore.config import Config +from botocore.exceptions import BotoCoreError +from botocore.session import Session + +from aws_encryption_sdk.keyrings.aws_kms._client_cache import ClientCache + +pytestmark = [pytest.mark.integ] + + +def test_client_cache_removes_bad_client(): + cache = ClientCache(botocore_session=Session(), client_config=Config()) + fake_region = "us-fake-1" + + initial_client = cache.client(fake_region, "kms") + + assert fake_region in cache._cache + + with pytest.raises(BotoCoreError): + initial_client.encrypt(KeyId="foo", Plaintext=b"bar") + + assert fake_region not in cache._cache + + +def test_regional_client_does_not_modify_botocore_session(): + cache = ClientCache(botocore_session=Session(), client_config=Config()) + fake_region = "us-fake-1" + + assert cache._botocore_session.get_config_variable("region") != fake_region + cache.client(fake_region, "kms") + assert cache._botocore_session.get_config_variable("region") != fake_region + + +def test_client_cache_remove_bad_client_when_already_removed(): + cache = ClientCache(botocore_session=Session(), client_config=Config()) + fake_region = "us-fake-1" + + initial_client = cache.client(fake_region, "kms") + + assert fake_region in cache._cache + del cache._cache[fake_region] + + with pytest.raises(BotoCoreError): + initial_client.encrypt(KeyId="foo", Plaintext=b"bar") + + assert fake_region not in cache._cache diff --git a/test/integration/test_client.py b/test/integration/test_client.py index 2b66fb408..7f5c1e983 100644 --- a/test/integration/test_client.py +++ b/test/integration/test_client.py @@ -5,17 +5,11 @@ import logging import pytest -from botocore.exceptions import BotoCoreError import aws_encryption_sdk from aws_encryption_sdk.identifiers import USER_AGENT_SUFFIX, Algorithm -from aws_encryption_sdk.key_providers.kms import KMSMasterKey, KMSMasterKeyProvider -from .integration_test_utils import ( - get_cmk_arn, - setup_kms_master_key_provider, - setup_kms_master_key_provider_with_botocore_session, -) +from .integration_test_utils import build_aws_kms_keyring, get_cmk_arn, setup_kms_master_key_provider pytestmark = [pytest.mark.integ] @@ -33,411 +27,90 @@ } -def test_encrypt_verify_user_agent_kms_master_key_provider(caplog): - caplog.set_level(level=logging.DEBUG) - mkp = setup_kms_master_key_provider() - mk = mkp.master_key(get_cmk_arn()) - - mk.generate_data_key(algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, encryption_context={}) - - assert USER_AGENT_SUFFIX in caplog.text - - -def test_encrypt_verify_user_agent_kms_master_key(caplog): +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(key_provider=setup_kms_master_key_provider()), id="AWS KMS master key provider"), + pytest.param( + dict(key_provider=setup_kms_master_key_provider().master_key(get_cmk_arn())), id="AWS KMS master key" + ), + pytest.param(dict(keyring=build_aws_kms_keyring()), id="AWS KMS keyring"), + ), +) +def test_encrypt_verify_user_agent_in_logs(caplog, kwargs): caplog.set_level(level=logging.DEBUG) - mk = KMSMasterKey(key_id=get_cmk_arn()) - mk.generate_data_key(algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, encryption_context={}) + aws_encryption_sdk.encrypt(source=VALUES["plaintext_128"], **kwargs) assert USER_AGENT_SUFFIX in caplog.text -def test_remove_bad_client(): - test = KMSMasterKeyProvider() - fake_region = "us-fakey-12" - test.add_regional_client(fake_region) - - with pytest.raises(BotoCoreError): - test._regional_clients[fake_region].list_keys() - - assert fake_region not in test._regional_clients - - -def test_regional_client_does_not_modify_botocore_session(caplog): - mkp = setup_kms_master_key_provider_with_botocore_session() - fake_region = "us-fakey-12" - - assert mkp.config.botocore_session.get_config_variable("region") != fake_region - mkp.add_regional_client(fake_region) - assert mkp.config.botocore_session.get_config_variable("region") != fake_region - - -class TestKMSThickClientIntegration(object): - @pytest.fixture(autouse=True) - def apply_fixtures(self): - self.kms_master_key_provider = setup_kms_master_key_provider() - - def test_encryption_cycle_default_algorithm_framed_stream(self): - """Test that the enrypt/decrypt cycle completes successfully - for a framed message using the default algorithm. - """ - with aws_encryption_sdk.stream( - source=io.BytesIO(VALUES["plaintext_128"]), - key_provider=self.kms_master_key_provider, - mode="e", - encryption_context=VALUES["encryption_context"], - ) as encryptor: - ciphertext = encryptor.read() - header_1 = encryptor.header - with aws_encryption_sdk.stream( - source=io.BytesIO(ciphertext), key_provider=self.kms_master_key_provider, mode="d" - ) as decryptor: - plaintext = decryptor.read() - header_2 = decryptor.header - assert plaintext == VALUES["plaintext_128"] - assert header_1.encryption_context == header_2.encryption_context - - def test_encryption_cycle_default_algorithm_framed_stream_many_lines(self): - """Test that the enrypt/decrypt cycle completes successfully - for a framed message with many frames using the default algorithm. - """ - ciphertext = b"" - with aws_encryption_sdk.stream( - source=io.BytesIO(VALUES["plaintext_128"] * 10), - key_provider=self.kms_master_key_provider, - mode="e", - encryption_context=VALUES["encryption_context"], - frame_length=128, - ) as encryptor: - for chunk in encryptor: - ciphertext += chunk - header_1 = encryptor.header - plaintext = b"" - with aws_encryption_sdk.stream( - source=io.BytesIO(ciphertext), key_provider=self.kms_master_key_provider, mode="d" - ) as decryptor: - for chunk in decryptor: - plaintext += chunk - header_2 = decryptor.header - assert plaintext == VALUES["plaintext_128"] * 10 - assert header_1.encryption_context == header_2.encryption_context - - def test_encryption_cycle_default_algorithm_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully - for a non-framed message using the default algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_default_algorithm_non_framed_no_encryption_context(self): - """Test that the enrypt/decrypt cycle completes successfully - for a non-framed message using the default algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], key_provider=self.kms_master_key_provider, frame_length=0 - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_default_algorithm_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully - for a single frame message using the default algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_default_algorithm_multiple_frames(self): - """Test that the enrypt/decrypt cycle completes successfully - for a framed message with multiple frames using the - default algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"] * 100, - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] * 100 - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully - for a single frame message using the aes_128_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully - for a non-framed message using the aes_128_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully - for a single frame message using the aes_192_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully - for a non-framed message using the aes_192_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully - for a single frame message using the aes_256_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully - for a non-framed message using the aes_256_gcm_iv12_tag16 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_hkdf_sha256_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a - single frame message using the aes_128_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_hkdf_sha256_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a - non-framed message using the aes_128_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_hkdf_sha256_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a - single frame message using the aes_192_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_hkdf_sha256_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a - non-framed message using the aes_192_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_hkdf_sha256_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a - single frame message using the aes_256_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_hkdf_sha256_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a - non-framed message using the aes_256_gcm_iv12_tag16_hkdf_sha256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_hkdf_sha256_ecdsa_p256_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - frame message using the aes_128_gcm_iv12_tag16_hkdf_sha256_ecdsa_p256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16_HKDF_SHA256_ECDSA_P256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_128_gcm_iv12_tag16_hkdf_sha256_ecdsa_p256_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - block message using the aes_128_gcm_iv12_tag16_hkdf_sha256_ecdsa_p256 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_128_GCM_IV12_TAG16_HKDF_SHA256_ECDSA_P256, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - frame message using the aes_192_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_192_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - block message using the aes_192_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_192_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384_single_frame(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - frame message using the aes_256_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=1024, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] - - def test_encryption_cycle_aes_256_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384_non_framed(self): - """Test that the enrypt/decrypt cycle completes successfully for a single - block message using the aes_256_gcm_iv12_tag16_hkdf_sha384_ecdsa_p384 - algorithm. - """ - ciphertext, _ = aws_encryption_sdk.encrypt( - source=VALUES["plaintext_128"], - key_provider=self.kms_master_key_provider, - encryption_context=VALUES["encryption_context"], - frame_length=0, - algorithm=Algorithm.AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384, - ) - plaintext, _ = aws_encryption_sdk.decrypt(source=ciphertext, key_provider=self.kms_master_key_provider) - assert plaintext == VALUES["plaintext_128"] +@pytest.mark.parametrize("frame_size", (pytest.param(0, id="unframed"), pytest.param(1024, id="1024 byte frame"))) +@pytest.mark.parametrize("algorithm_suite", Algorithm) +@pytest.mark.parametrize( + "encrypt_key_provider_kwargs", + ( + pytest.param(dict(key_provider=setup_kms_master_key_provider()), id="encrypt with MKP"), + pytest.param(dict(keyring=build_aws_kms_keyring()), id="encrypt with keyring"), + ), +) +@pytest.mark.parametrize( + "decrypt_key_provider_kwargs", + ( + pytest.param(dict(key_provider=setup_kms_master_key_provider()), id="decrypt with MKP"), + pytest.param(dict(keyring=build_aws_kms_keyring()), id="decrypt with keyring"), + ), +) +@pytest.mark.parametrize( + "encryption_context", + ( + pytest.param({}, id="empty encryption context"), + pytest.param(VALUES["encryption_context"], id="non-empty encryption context"), + ), +) +@pytest.mark.parametrize( + "plaintext", + ( + pytest.param(VALUES["plaintext_128"], id="plaintext smaller than frame"), + pytest.param(VALUES["plaintext_128"] * 100, id="plaintext larger than frame"), + ), +) +def test_encrypt_decrypt_cycle_aws_kms( + frame_size, algorithm_suite, encrypt_key_provider_kwargs, decrypt_key_provider_kwargs, encryption_context, plaintext +): + ciphertext, _ = aws_encryption_sdk.encrypt( + source=plaintext, + encryption_context=encryption_context, + frame_length=frame_size, + algorithm=algorithm_suite, + **encrypt_key_provider_kwargs + ) + decrypted, _ = aws_encryption_sdk.decrypt(source=ciphertext, **decrypt_key_provider_kwargs) + assert decrypted == plaintext + + +@pytest.mark.parametrize( + "plaintext", + ( + pytest.param(VALUES["plaintext_128"], id="plaintext smaller than frame"), + pytest.param(VALUES["plaintext_128"] * 100, id="plaintext larger than frame"), + ), +) +def test_encrypt_decrypt_cycle_aws_kms_streaming(plaintext): + keyring = build_aws_kms_keyring() + ciphertext = b"" + with aws_encryption_sdk.stream( + source=io.BytesIO(plaintext), keyring=keyring, mode="e", encryption_context=VALUES["encryption_context"], + ) as encryptor: + for chunk in encryptor: + ciphertext += chunk + header_1 = encryptor.header + + decrypted = b"" + with aws_encryption_sdk.stream(source=io.BytesIO(ciphertext), keyring=keyring, mode="d") as decryptor: + for chunk in decryptor: + decrypted += chunk + header_2 = decryptor.header + + assert decrypted == plaintext + assert header_1.encryption_context == header_2.encryption_context diff --git a/test/requirements.txt b/test/requirements.txt index 152b5dbf4..ff9311dc4 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -2,3 +2,4 @@ mock pytest>=3.3.1 pytest-cov pytest-mock +moto>=1.3.14 diff --git a/test/unit/key_providers/base/test_base_master_key_provider_config.py b/test/unit/key_providers/base/test_base_master_key_provider_config.py index 0e21ded80..9604b84a8 100644 --- a/test/unit/key_providers/base/test_base_master_key_provider_config.py +++ b/test/unit/key_providers/base/test_base_master_key_provider_config.py @@ -11,7 +11,11 @@ # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Unit test suite to validate aws_encryption_sdk.key_providers.base.MasterKeyProviderConfig""" +import pytest + from aws_encryption_sdk.key_providers.base import MasterKeyProviderConfig # noqa pylint: disable=unused-import +pytestmark = [pytest.mark.unit, pytest.mark.local] + # Nothing to test at this time, but import will ensure that it exists. # If this MasterKeyProviderConfig has attributes added in the future, they should be tested here. diff --git a/test/unit/keyrings/test_aws_kms.py b/test/unit/keyrings/test_aws_kms.py new file mode 100644 index 000000000..ad2e4b526 --- /dev/null +++ b/test/unit/keyrings/test_aws_kms.py @@ -0,0 +1,198 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 +"""Unit tests for ``aws_encryption_sdk.keyrings.aws_kms``.""" +import pytest + +from aws_encryption_sdk.keyrings.aws_kms import ( + KmsKeyring, + _AwsKmsDiscoveryKeyring, + _AwsKmsSingleCmkKeyring, + _region_from_key_id, +) +from aws_encryption_sdk.keyrings.aws_kms.client_suppliers import DefaultClientSupplier +from aws_encryption_sdk.keyrings.multi import MultiKeyring + +pytestmark = [pytest.mark.unit, pytest.mark.local] + + +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(client_supplier=None), id="client_supplier is invalid"), + pytest.param(dict(generator_key_id=5), id="generator_id is invalid"), + pytest.param(dict(child_key_ids=("foo", 5)), id="child_key_ids contains invalid values"), + pytest.param(dict(child_key_ids="some stuff"), id="child_key_ids is a string"), + pytest.param(dict(grant_tokens=("foo", 5)), id="grant_tokens contains invalid values"), + pytest.param(dict(grant_tokens="some stuff"), id="grant_tokens is a string"), + ), +) +def test_kms_keyring_invalid_parameters(kwargs): + with pytest.raises(TypeError): + KmsKeyring(**kwargs) + + +def test_kms_keyring_builds_correct_inner_keyring_multikeyring(): + generator_id = "foo" + child_id_1 = "bar" + child_id_2 = "baz" + grants = ("asdf", "fdsa") + supplier = DefaultClientSupplier() + + test = KmsKeyring( + generator_key_id=generator_id, + child_key_ids=(child_id_1, child_id_2), + grant_tokens=grants, + client_supplier=supplier, + ) + + # We specified a generator and child IDs, so the inner keyring MUST be a multikeyring + assert isinstance(test._inner_keyring, MultiKeyring) + + # Verify that the generator is configured correctly + assert isinstance(test._inner_keyring.generator, _AwsKmsSingleCmkKeyring) + assert test._inner_keyring.generator._key_id == generator_id + assert test._inner_keyring.generator._grant_tokens == grants + assert test._inner_keyring.generator._client_supplier is supplier + + # We specified two child IDs, so there MUST be exactly two children + assert len(test._inner_keyring.children) == 2 + + # Verify that the first child is configured correctly + assert isinstance(test._inner_keyring.children[0], _AwsKmsSingleCmkKeyring) + assert test._inner_keyring.children[0]._key_id == child_id_1 + assert test._inner_keyring.children[0]._grant_tokens == grants + assert test._inner_keyring.children[0]._client_supplier is supplier + + # Verify that the second child is configured correctly + assert isinstance(test._inner_keyring.children[1], _AwsKmsSingleCmkKeyring) + assert test._inner_keyring.children[1]._key_id == child_id_2 + assert test._inner_keyring.children[1]._grant_tokens == grants + assert test._inner_keyring.children[1]._client_supplier is supplier + + +def test_kms_keyring_builds_correct_inner_keyring_multikeyring_no_generator(): + test = KmsKeyring(child_key_ids=("bar", "baz")) + + # We specified child IDs, so the inner keyring MUST be a multikeyring + assert isinstance(test._inner_keyring, MultiKeyring) + + # We did not specify a generator ID, so the generator MUST NOT be set + assert test._inner_keyring.generator is None + + # We specified two child IDs, so there MUST be exactly two children + assert len(test._inner_keyring.children) == 2 + + +def test_kms_keyring_builds_correct_inner_keyring_multikeyring_no_children(): + test = KmsKeyring(generator_key_id="foo") + + # We specified a generator ID, so the inner keyring MUST be a multikeyring + assert isinstance(test._inner_keyring, MultiKeyring) + + # We specified a generator ID, so the generator MUST be set + assert test._inner_keyring.generator is not None + + # We did not specify any child IDs, so the multikeyring MUST NOT contain any children + assert len(test._inner_keyring.children) == 0 + + +def test_kms_keyring_builds_correct_inner_keyring_discovery(): + grants = ("asdf", "fdas") + supplier = DefaultClientSupplier() + + test = KmsKeyring(grant_tokens=grants, client_supplier=supplier) + + # We specified neither a generator nor children, so the inner keyring MUST be a discovery keyring + assert isinstance(test._inner_keyring, _AwsKmsDiscoveryKeyring) + + # Verify that the discovery keyring is configured correctly + assert test._inner_keyring._grant_tokens == grants + assert test._inner_keyring._client_supplier is supplier + + +def test_kms_keyring_on_encrypt(mocker): + mock_keyring = mocker.Mock() + + keyring = KmsKeyring() + keyring._inner_keyring = mock_keyring + + test = keyring.on_encrypt(encryption_materials=mocker.sentinel.encryption_materials) + + # on_encrypt MUST be a straight passthrough to the inner keyring + assert mock_keyring.on_encrypt.called_once_with(encryption_materials=mocker.sentinel.encryption_materials) + assert test is mock_keyring.on_encrypt.return_value + + +def test_kms_keyring_on_decrypt(mocker): + mock_keyring = mocker.Mock() + + keyring = KmsKeyring() + keyring._inner_keyring = mock_keyring + + test = keyring.on_decrypt( + decryption_materials=mocker.sentinel.decryption_materials, + encrypted_data_keys=mocker.sentinel.encrypted_data_keys, + ) + + # on_decrypt MUST be a straight passthrough to the inner keyring + assert mock_keyring.on_decrypt.called_once_with( + decryption_materials=mocker.sentinel.decryption_materials, + encrypted_data_keys=mocker.sentinel.encrypted_data_keys, + ) + assert test is mock_keyring.on_decrypt.return_value + + +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(key_id=None, client_supplier=DefaultClientSupplier()), id="key_id is invalid"), + pytest.param(dict(key_id="foo", client_supplier=None), id="client_supplier is invalid"), + pytest.param( + dict(key_id="foo", client_supplier=DefaultClientSupplier(), grant_tokens=("bar", 5)), + id="grant_tokens contains invalid values", + ), + pytest.param( + dict(key_id="foo", client_supplier=DefaultClientSupplier(), grant_tokens="some stuff"), + id="grant_tokens is a string", + ), + ), +) +def test_aws_kms_single_cmk_keyring_invalid_parameters(kwargs): + with pytest.raises(TypeError): + _AwsKmsSingleCmkKeyring(**kwargs) + + +@pytest.mark.parametrize( + "kwargs", + ( + pytest.param(dict(client_supplier=None), id="client_supplier is invalid"), + pytest.param( + dict(client_supplier=DefaultClientSupplier(), grant_tokens=("bar", 5)), + id="grant_tokens contains invalid values", + ), + pytest.param( + dict(client_supplier=DefaultClientSupplier(), grant_tokens="some stuff"), id="grant_tokens is a string", + ), + ), +) +def test_aws_kms_discovery_keyring_invalid_parameters(kwargs): + with pytest.raises(TypeError): + _AwsKmsDiscoveryKeyring(**kwargs) + + +@pytest.mark.parametrize( + "key_id, expected", + ( + pytest.param("foo", None, id="invalid format"), + pytest.param("alias/foo", None, id="alias name"), + pytest.param("880e7651-6f87-4c68-b84b-3220da5a7a02", None, id="key ID"), + pytest.param("arn:aws:kms:moon-base-1:111222333444:alias/foo", "moon-base-1", id="alias ARN"), + pytest.param( + "arn:aws:kms:moon-base-1:111222333444:key/880e7651-6f87-4c68-b84b-3220da5a7a02", "moon-base-1", id="CMK ARN" + ), + ), +) +def test_region_from_key_id(key_id, expected): + actual = _region_from_key_id(key_id=key_id) + + assert actual == expected diff --git a/test/upstream-requirements-py27.txt b/test/upstream-requirements-py27.txt index 082a6198e..92e723b9c 100644 --- a/test/upstream-requirements-py27.txt +++ b/test/upstream-requirements-py27.txt @@ -1,30 +1,72 @@ -asn1crypto==0.24.0 atomicwrites==1.3.0 -attrs==19.1.0 -boto3==1.9.133 -botocore==1.12.133 -cffi==1.12.3 -coverage==4.5.3 -cryptography==2.6.1 -docutils==0.14 -enum34==1.1.6 +attrs==19.3.0 +aws-sam-translator==1.21.0 +aws-xray-sdk==2.4.3 +backports.ssl-match-hostname==3.7.0.1 +backports.tempfile==1.0 +backports.weakref==1.0.post1 +boto==2.49.0 +boto3==1.12.16 +botocore==1.15.16 +certifi==2019.11.28 +cffi==1.14.0 +cfn-lint==0.28.3 +chardet==3.0.4 +configparser==4.0.2 +contextlib2==0.6.0.post1 +cookies==2.2.1 +coverage==5.0.3 +cryptography==2.8 +docker==4.2.0 +docutils==0.15.2 +ecdsa==0.15 +enum34==1.1.9 funcsigs==1.0.2 -futures==3.2.0 -ipaddress==1.0.22 -jmespath==0.9.4 -mock==2.0.0 +functools32==3.2.3.post2 +future==0.18.2 +futures==3.3.0 +idna==2.8 +importlib-metadata==1.5.0 +importlib-resources==1.0.2 +ipaddress==1.0.23 +Jinja2==2.11.1 +jmespath==0.9.5 +jsondiff==1.1.2 +jsonpatch==1.25 +jsonpickle==1.3 +jsonpointer==2.0 +jsonschema==3.2.0 +MarkupSafe==1.1.1 +mock==3.0.5 more-itertools==5.0.0 -pathlib2==2.3.3 -pbr==5.1.3 -pluggy==0.9.0 -py==1.8.0 -pycparser==2.19 -pytest==4.4.1 -pytest-cov==2.6.1 -pytest-mock==1.10.4 -python-dateutil==2.8.0 -s3transfer==0.2.0 +moto==1.3.14 +packaging==20.3 +pathlib2==2.3.5 +pluggy==0.13.1 +py==1.8.1 +pyasn1==0.4.8 +pycparser==2.20 +pyparsing==2.4.6 +pyrsistent==0.15.7 +pytest==4.6.9 +pytest-cov==2.8.1 +pytest-mock==2.0.0 +python-dateutil==2.8.1 +python-jose==3.1.0 +pytz==2019.3 +PyYAML==5.3 +requests==2.23.0 +responses==0.10.12 +rsa==4.0 +s3transfer==0.3.3 scandir==1.10.0 -six==1.12.0 -urllib3==1.24.2 -wrapt==1.11.1 +six==1.14.0 +sshpubkeys==3.1.0 +typing==3.7.4.1 +urllib3==1.25.8 +wcwidth==0.1.8 +websocket-client==0.57.0 +Werkzeug==1.0.0 +wrapt==1.12.0 +xmltodict==0.12.0 +zipp==1.2.0 diff --git a/test/upstream-requirements-py37.txt b/test/upstream-requirements-py37.txt index 238746675..03a6eb36e 100644 --- a/test/upstream-requirements-py37.txt +++ b/test/upstream-requirements-py37.txt @@ -1,24 +1,56 @@ -asn1crypto==0.24.0 -atomicwrites==1.3.0 -attrs==19.1.0 -boto3==1.9.133 -botocore==1.12.133 -cffi==1.12.3 -coverage==4.5.3 -cryptography==2.6.1 -docutils==0.14 -jmespath==0.9.4 -mock==2.0.0 -more-itertools==7.0.0 -pbr==5.1.3 -pluggy==0.9.0 -py==1.8.0 -pycparser==2.19 -pytest==4.4.1 -pytest-cov==2.6.1 -pytest-mock==1.10.4 -python-dateutil==2.8.0 -s3transfer==0.2.0 -six==1.12.0 -urllib3==1.24.2 -wrapt==1.11.1 +attrs==19.3.0 +aws-sam-translator==1.21.0 +aws-xray-sdk==2.4.3 +boto==2.49.0 +boto3==1.12.16 +botocore==1.15.16 +certifi==2019.11.28 +cffi==1.14.0 +cfn-lint==0.28.3 +chardet==3.0.4 +coverage==5.0.3 +cryptography==2.8 +docker==4.2.0 +docutils==0.15.2 +ecdsa==0.15 +future==0.18.2 +idna==2.8 +importlib-metadata==1.5.0 +Jinja2==2.11.1 +jmespath==0.9.5 +jsondiff==1.1.2 +jsonpatch==1.25 +jsonpickle==1.3 +jsonpointer==2.0 +jsonschema==3.2.0 +MarkupSafe==1.1.1 +mock==4.0.1 +more-itertools==8.2.0 +moto==1.3.14 +packaging==20.3 +pluggy==0.13.1 +py==1.8.1 +pyasn1==0.4.8 +pycparser==2.20 +pyparsing==2.4.6 +pyrsistent==0.15.7 +pytest==5.3.5 +pytest-cov==2.8.1 +pytest-mock==2.0.0 +python-dateutil==2.8.1 +python-jose==3.1.0 +pytz==2019.3 +PyYAML==5.3 +requests==2.23.0 +responses==0.10.12 +rsa==4.0 +s3transfer==0.3.3 +six==1.14.0 +sshpubkeys==3.1.0 +urllib3==1.25.8 +wcwidth==0.1.8 +websocket-client==0.57.0 +Werkzeug==1.0.0 +wrapt==1.12.0 +xmltodict==0.12.0 +zipp==3.1.0 diff --git a/tox.ini b/tox.ini index 1df161ccd..d9d60d756 100644 --- a/tox.ini +++ b/tox.ini @@ -232,9 +232,11 @@ commands = {[testenv:isort]commands} -c [testenv:autoformat] basepython = python3 deps = + {[testenv:isort-seed]deps} {[testenv:blacken]deps} {[testenv:isort]deps} commands = + {[testenv:isort-seed]commands} {[testenv:blacken]commands} {[testenv:isort]commands}