Skip to content

chore(custom_cmm_example.py): added test for custom_cmm_example.py #690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions examples/src/legacy/custom_cmm_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Example to create a custom crypto material manager class."""

import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy, StrictAwsKmsMasterKeyProvider
from aws_encryption_sdk.materials_managers.base import CryptoMaterialsManager
from aws_encryption_sdk.materials_managers.default import DefaultCryptoMaterialsManager


# Custom CMM implementation.
# This CMM only allows encryption/decryption using signing algorithms.
# It wraps an underlying CMM implementation and checks its materials
# to ensure that it is only using signed encryption algorithms.
class CustomSigningSuiteOnlyCMM(CryptoMaterialsManager):
"""Example custom crypto materials manager class."""

def __init__(self, master_key_provider: StrictAwsKmsMasterKeyProvider) -> None:
"""Constructor for CustomSigningSuiteOnlyCMM class."""
self.underlying_cmm = DefaultCryptoMaterialsManager(master_key_provider)

def get_encryption_materials(self, request):
"""Provides encryption materials appropriate for the request for the custom CMM.

:param EncryptionMaterialsRequest request: Request object to provide to a
crypto material manager's `get_encryption_materials` method.
:returns: Encryption materials
:rtype: EncryptionMaterials
"""
materials = self.underlying_cmm.get_encryption_materials(request)
if not materials.algorithm.is_signing():
raise AssertionError(
"Algorithm provided to CustomSigningSuiteOnlyCMM"
+ " is not a supported signing algorithm: " + materials.algorithm
)
return materials

def decrypt_materials(self, request):
"""Provider decryption materials appropriate for the request.

:param DecryptionMaterialsRequest request: Request object to provide to a
crypto material manager's `decrypt_materials` method.
"""
if not request.algorithm.is_signing():
raise AssertionError(
"Algorithm provided to CustomSigningSuiteOnlyCMM"
+ " is not a supported signing algorithm: " + request.algorithm
)
return self.underlying_cmm.decrypt_materials(request)


def encrypt_decrypt_with_cmm(
cmm: CryptoMaterialsManager,
source_plaintext: str
):
"""Encrypts and decrypts a string using a CMM.

:param CryptoMaterialsManager cmm: CMM to use for encryption and decryption
:param bytes source_plaintext: Data to encrypt
"""
# Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a
# commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default.
client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT)

# Encrypt the plaintext source data
ciphertext, encryptor_header = client.encrypt(
source=source_plaintext,
materials_manager=cmm
)

# Decrypt the ciphertext
cycled_plaintext, decrypted_header = client.decrypt(
source=ciphertext,
materials_manager=cmm
)

# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
assert cycled_plaintext == source_plaintext

# Verify that the encryption context used in the decrypt operation includes all key pairs from
# the encrypt operation. (The SDK can add pairs, so don't require an exact match.)
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in decrypted_header.encryption_context.items() for pair in encryptor_header.encryption_context.items()
)
2 changes: 1 addition & 1 deletion examples/test/legacy/examples_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

from integration_test_utils import ( # noqa pylint: disable=unused-import,import-error
get_cmk_arn,
get_second_cmk_arn,
get_mrk_arn,
get_second_cmk_arn,
get_second_mrk_arn,
)
52 changes: 52 additions & 0 deletions examples/test/legacy/test_i_custom_cmm_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Test suite for encryption and decryption using custom CMM."""

import botocore.session
import pytest

import aws_encryption_sdk

from ...src.legacy.custom_cmm_example import CustomSigningSuiteOnlyCMM, encrypt_decrypt_with_cmm
from .examples_test_utils import get_cmk_arn, static_plaintext
from .v3_default_cmm import V3DefaultCryptoMaterialsManager

pytestmark = [pytest.mark.examples]


def test_custom_cmm_example():
"""Test method for encryption and decryption using V3 default CMM."""
plaintext = static_plaintext
cmk_arn = get_cmk_arn()
botocore_session = botocore.session.Session()

# Create a KMS master key provider.
kms_kwargs = dict(key_ids=[cmk_arn])
if botocore_session is not None:
kms_kwargs["botocore_session"] = botocore_session
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)

# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider
cmm = CustomSigningSuiteOnlyCMM(master_key_provider=master_key_provider)

encrypt_decrypt_with_cmm(cmm=cmm,
source_plaintext=plaintext)


def test_v3_default_cmm():
"""Test method for encryption and decryption using V3 default CMM."""
plaintext = static_plaintext
cmk_arn = get_cmk_arn()
botocore_session = botocore.session.Session()

# Create a KMS master key provider.
kms_kwargs = dict(key_ids=[cmk_arn])
if botocore_session is not None:
kms_kwargs["botocore_session"] = botocore_session
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)

# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider
cmm = V3DefaultCryptoMaterialsManager(master_key_provider=master_key_provider)

encrypt_decrypt_with_cmm(cmm=cmm,
source_plaintext=plaintext)
159 changes: 159 additions & 0 deletions examples/test/legacy/v3_default_cmm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Copy-paste of the V3 default CMM."""
import logging

import attr

from aws_encryption_sdk.exceptions import MasterKeyProviderError, SerializationError
from aws_encryption_sdk.identifiers import CommitmentPolicy
from aws_encryption_sdk.internal.crypto.authentication import Signer, Verifier
from aws_encryption_sdk.internal.crypto.elliptic_curve import generate_ecc_signing_key
from aws_encryption_sdk.internal.defaults import ALGORITHM, ALGORITHM_COMMIT_KEY, ENCODED_SIGNER_KEY
from aws_encryption_sdk.internal.str_ops import to_str
from aws_encryption_sdk.internal.utils import prepare_data_keys
from aws_encryption_sdk.internal.utils.commitment import (
validate_commitment_policy_on_decrypt,
validate_commitment_policy_on_encrypt,
)
from aws_encryption_sdk.key_providers.base import MasterKeyProvider
from aws_encryption_sdk.materials_managers import DecryptionMaterials, EncryptionMaterials
from aws_encryption_sdk.materials_managers.base import CryptoMaterialsManager

_LOGGER = logging.getLogger(__name__)


@attr.s(hash=False)
class V3DefaultCryptoMaterialsManager(CryptoMaterialsManager):
"""Copy of the default crypto material manager for ESDK V3.

This is a copy-paste of the DefaultCryptoMaterialsManager implementation
from the V3 ESDK commit: 98b5eb7c2bd7d1b2a3380aacfa508e8721c4d8a9
This CMM is used to explicitly assert that the V3 implementation of
the DefaultCMM is compatible with future version's logic,
which implicitly asserts that custom implementations of V3-compatible CMMs
are also compatible with future version's logic.

:param master_key_provider: Master key provider to use
:type master_key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
"""

master_key_provider = attr.ib(validator=attr.validators.instance_of(MasterKeyProvider))

# pylint: disable=no-self-use
def _generate_signing_key_and_update_encryption_context(self, algorithm, encryption_context):
"""Generates a signing key based on the provided algorithm.

:param algorithm: Algorithm for which to generate signing key
:type algorithm: aws_encryption_sdk.identifiers.Algorithm
:param dict encryption_context: Encryption context from request
:returns: Signing key bytes
:rtype: bytes or None
"""
_LOGGER.debug("Generating signing key")
if algorithm.signing_algorithm_info is None:
return None

signer = Signer(algorithm=algorithm, key=generate_ecc_signing_key(algorithm=algorithm))
encryption_context[ENCODED_SIGNER_KEY] = to_str(signer.encoded_public_key())
return signer.key_bytes()

def get_encryption_materials(self, request):
"""Creates encryption materials using underlying master key provider.

:param request: encryption materials request
:type request: aws_encryption_sdk.materials_managers.EncryptionMaterialsRequest
:returns: encryption materials
:rtype: aws_encryption_sdk.materials_managers.EncryptionMaterials
:raises MasterKeyProviderError: if no master keys are available from the underlying master key provider
:raises MasterKeyProviderError: if the primary master key provided by the underlying master key provider
is not included in the full set of master keys provided by that provider
:raises ActionNotAllowedError: if the commitment policy in the request is violated by the algorithm being
used
"""
default_algorithm = ALGORITHM
if request.commitment_policy in (
CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT,
CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT,
):
default_algorithm = ALGORITHM_COMMIT_KEY
algorithm = request.algorithm if request.algorithm is not None else default_algorithm

validate_commitment_policy_on_encrypt(request.commitment_policy, request.algorithm)

encryption_context = request.encryption_context.copy()

signing_key = self._generate_signing_key_and_update_encryption_context(algorithm, encryption_context)

primary_master_key, master_keys = self.master_key_provider.master_keys_for_encryption(
encryption_context=encryption_context,
plaintext_rostream=request.plaintext_rostream,
plaintext_length=request.plaintext_length,
)
if not master_keys:
raise MasterKeyProviderError("No Master Keys available from Master Key Provider")
if primary_master_key not in master_keys:
raise MasterKeyProviderError("Primary Master Key not in provided Master Keys")

data_encryption_key, encrypted_data_keys = prepare_data_keys(
primary_master_key=primary_master_key,
master_keys=master_keys,
algorithm=algorithm,
encryption_context=encryption_context,
)

_LOGGER.debug("Post-encrypt encryption context: %s", encryption_context)

return EncryptionMaterials(
algorithm=algorithm,
data_encryption_key=data_encryption_key,
encrypted_data_keys=encrypted_data_keys,
encryption_context=encryption_context,
signing_key=signing_key,
)

# pylint: disable=no-self-use
def _load_verification_key_from_encryption_context(self, algorithm, encryption_context):
"""Loads the verification key from the encryption context if used by algorithm suite.

:param algorithm: Algorithm for which to generate signing key
:type algorithm: aws_encryption_sdk.identifiers.Algorithm
:param dict encryption_context: Encryption context from request
:returns: Raw verification key
:rtype: bytes
:raises SerializationError: if algorithm suite requires message signing and no verification key is found
"""
encoded_verification_key = encryption_context.get(ENCODED_SIGNER_KEY, None)

if algorithm.signing_algorithm_info is not None and encoded_verification_key is None:
raise SerializationError("No signature verification key found in header for signed algorithm.")

if algorithm.signing_algorithm_info is None:
if encoded_verification_key is not None:
raise SerializationError("Signature verification key found in header for non-signed algorithm.")
return None

verifier = Verifier.from_encoded_point(algorithm=algorithm, encoded_point=encoded_verification_key)
return verifier.key_bytes()

def decrypt_materials(self, request):
"""Obtains a plaintext data key from one or more encrypted data keys
using underlying master key provider.

:param request: decrypt materials request
:type request: aws_encryption_sdk.materials_managers.DecryptionMaterialsRequest
:returns: decryption materials
:rtype: aws_encryption_sdk.materials_managers.DecryptionMaterials
"""
validate_commitment_policy_on_decrypt(request.commitment_policy, request.algorithm)

data_key = self.master_key_provider.decrypt_data_key_from_list(
encrypted_data_keys=request.encrypted_data_keys,
algorithm=request.algorithm,
encryption_context=request.encryption_context,
)
verification_key = self._load_verification_key_from_encryption_context(
algorithm=request.algorithm, encryption_context=request.encryption_context
)

return DecryptionMaterials(data_key=data_key, verification_key=verification_key)
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
"""Test suite for the migration_aws_kms_key_example."""
import pytest

from ...src.migration.migration_aws_kms_key_example import (
migration_aws_kms_key,
)
from ...src.migration.migration_aws_kms_key_example import migration_aws_kms_key

pytestmark = [pytest.mark.examples]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
"""Test suite for the migration_raw_aes_key_example."""
import pytest

from ...src.migration.migration_raw_aes_key_example import (
migration_raw_aes_key,
)
from ...src.migration.migration_raw_aes_key_example import migration_raw_aes_key

pytestmark = [pytest.mark.examples]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
"""Test suite for the migration_raw_rsa_key_example."""
import pytest

from ...src.migration.migration_raw_rsa_key_example import (
migration_raw_rsa_key,
)
from ...src.migration.migration_raw_rsa_key_example import migration_raw_rsa_key

pytestmark = [pytest.mark.examples]

Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ deps = {[testenv:flake8]deps}
commands =
flake8 examples/src/
flake8 \
# Ingore D103 missing docstring errors in tests (test names should be self-documenting)
# Ignore D103 missing docstring errors in tests (test names should be self-documenting)
# E203 is not PEP8 compliant https://github.com/ambv/black#slices
# W503 is not PEP8 compliant https://github.com/ambv/black#line-breaks--binary-operators
--ignore D103,E203,W503 \
--per-file-ignores 'examples/test/legacy/v3_default_cmm.py: D205,D400,D401' \
examples/test/

[testenv:pylint]
Expand Down
Loading