-
Notifications
You must be signed in to change notification settings - Fork 85
chore(custom_cmm_example.py): added test for custom_cmm_example.py #690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
c3fe3e2
chore(v3_default_cmm.py): added test for v3_default_cmm.py; isort fix…
RitvikKapila cdfc30c
fix flake8
RitvikKapila 6bfc02f
update v3_default_cmm
RitvikKapila c296556
added CustomSigningSuiteOnlyCMM; refactoring
RitvikKapila 1d47afa
fix
RitvikKapila 0401aee
fix
RitvikKapila 0f53225
fix flake8, pylint
RitvikKapila f1e6c91
pylint fix
RitvikKapila 3de65cb
remove tox flake8 ignores
RitvikKapila 7bbe83e
fix
RitvikKapila 4f50576
fix
RitvikKapila 1e1b62c
fix(2)
RitvikKapila bdd0b12
fix
RitvikKapila a4c8670
fix
RitvikKapila 87c7fdf
fix
RitvikKapila fcbd4f8
fix
RitvikKapila 874f5be
resolve comments
RitvikKapila 1a4b447
added MPL CMM example and test
RitvikKapila bedf3cd
fix linters
RitvikKapila 7512802
minor fix
RitvikKapila 8e0a317
added custom mpl cmm example
RitvikKapila d6497eb
fix
RitvikKapila 349b559
fix
RitvikKapila 8c5bb63
refactoring and comment fixes
RitvikKapila 320d5c3
fixes
RitvikKapila File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
"""Example to create a custom crypto material manager class.""" | ||
RitvikKapila marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
import aws_encryption_sdk | ||
from aws_encryption_sdk import CommitmentPolicy, StrictAwsKmsMasterKeyProvider | ||
from aws_encryption_sdk.materials_managers.base import CryptoMaterialsManager | ||
from aws_encryption_sdk.materials_managers.default import DefaultCryptoMaterialsManager | ||
|
||
|
||
# Custom CMM implementation. | ||
# This CMM only allows encryption/decryption using signing algorithms. | ||
# It wraps an underlying CMM implementation and checks its materials | ||
# to ensure that it is only using signed encryption algorithms. | ||
class CustomSigningSuiteOnlyCMM(CryptoMaterialsManager): | ||
"""Example custom crypto materials manager class.""" | ||
|
||
def __init__(self, master_key_provider: StrictAwsKmsMasterKeyProvider) -> None: | ||
"""Constructor for CustomSigningSuiteOnlyCMM class.""" | ||
self.underlying_cmm = DefaultCryptoMaterialsManager(master_key_provider) | ||
|
||
def get_encryption_materials(self, request): | ||
"""Provides encryption materials appropriate for the request for the custom CMM. | ||
|
||
:param EncryptionMaterialsRequest request: Request object to provide to a | ||
crypto material manager's `get_encryption_materials` method. | ||
:returns: Encryption materials | ||
:rtype: EncryptionMaterials | ||
""" | ||
materials = self.underlying_cmm.get_encryption_materials(request) | ||
if not materials.algorithm.is_signing(): | ||
raise AssertionError( | ||
"Algorithm provided to CustomSigningSuiteOnlyCMM" | ||
+ " is not a supported signing algorithm: " + materials.algorithm | ||
) | ||
return materials | ||
|
||
def decrypt_materials(self, request): | ||
"""Provider decryption materials appropriate for the request. | ||
|
||
:param DecryptionMaterialsRequest request: Request object to provide to a | ||
crypto material manager's `decrypt_materials` method. | ||
""" | ||
if not request.algorithm.is_signing(): | ||
raise AssertionError( | ||
"Algorithm provided to CustomSigningSuiteOnlyCMM" | ||
+ " is not a supported signing algorithm: " + request.algorithm | ||
) | ||
return self.underlying_cmm.decrypt_materials(request) | ||
|
||
|
||
def encrypt_decrypt_with_cmm( | ||
cmm: CryptoMaterialsManager, | ||
source_plaintext: str | ||
): | ||
"""Encrypts and decrypts a string using a CMM. | ||
|
||
:param CryptoMaterialsManager cmm: CMM to use for encryption and decryption | ||
:param bytes source_plaintext: Data to encrypt | ||
""" | ||
# Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a | ||
# commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. | ||
client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) | ||
|
||
# Encrypt the plaintext source data | ||
ciphertext, encryptor_header = client.encrypt( | ||
source=source_plaintext, | ||
materials_manager=cmm | ||
) | ||
|
||
# Decrypt the ciphertext | ||
cycled_plaintext, decrypted_header = client.decrypt( | ||
source=ciphertext, | ||
materials_manager=cmm | ||
) | ||
|
||
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext | ||
assert cycled_plaintext == source_plaintext | ||
|
||
# Verify that the encryption context used in the decrypt operation includes all key pairs from | ||
# the encrypt operation. (The SDK can add pairs, so don't require an exact match.) | ||
# | ||
# In production, always use a meaningful encryption context. In this sample, we omit the | ||
# encryption context (no key pairs). | ||
assert all( | ||
pair in decrypted_header.encryption_context.items() for pair in encryptor_header.encryption_context.items() | ||
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
"""Test suite for encryption and decryption using custom CMM.""" | ||
|
||
import botocore.session | ||
import pytest | ||
|
||
import aws_encryption_sdk | ||
|
||
from ...src.legacy.custom_cmm_example import CustomSigningSuiteOnlyCMM, encrypt_decrypt_with_cmm | ||
from .examples_test_utils import get_cmk_arn, static_plaintext | ||
from .v3_default_cmm import V3DefaultCryptoMaterialsManager | ||
|
||
pytestmark = [pytest.mark.examples] | ||
|
||
|
||
def test_custom_cmm_example(): | ||
"""Test method for encryption and decryption using V3 default CMM.""" | ||
plaintext = static_plaintext | ||
cmk_arn = get_cmk_arn() | ||
botocore_session = botocore.session.Session() | ||
|
||
# Create a KMS master key provider. | ||
kms_kwargs = dict(key_ids=[cmk_arn]) | ||
if botocore_session is not None: | ||
kms_kwargs["botocore_session"] = botocore_session | ||
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs) | ||
|
||
# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider | ||
RitvikKapila marked this conversation as resolved.
Show resolved
Hide resolved
|
||
cmm = CustomSigningSuiteOnlyCMM(master_key_provider=master_key_provider) | ||
|
||
encrypt_decrypt_with_cmm(cmm=cmm, | ||
source_plaintext=plaintext) | ||
|
||
|
||
def test_v3_default_cmm(): | ||
"""Test method for encryption and decryption using V3 default CMM.""" | ||
plaintext = static_plaintext | ||
cmk_arn = get_cmk_arn() | ||
botocore_session = botocore.session.Session() | ||
|
||
# Create a KMS master key provider. | ||
kms_kwargs = dict(key_ids=[cmk_arn]) | ||
if botocore_session is not None: | ||
kms_kwargs["botocore_session"] = botocore_session | ||
master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs) | ||
|
||
# Create the V3 default CMM (V3DefaultCryptoMaterialsManager) using the master_key_provider | ||
cmm = V3DefaultCryptoMaterialsManager(master_key_provider=master_key_provider) | ||
|
||
encrypt_decrypt_with_cmm(cmm=cmm, | ||
source_plaintext=plaintext) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
"""Copy-paste of the V3 default CMM.""" | ||
RitvikKapila marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import logging | ||
|
||
import attr | ||
|
||
from aws_encryption_sdk.exceptions import MasterKeyProviderError, SerializationError | ||
from aws_encryption_sdk.identifiers import CommitmentPolicy | ||
from aws_encryption_sdk.internal.crypto.authentication import Signer, Verifier | ||
from aws_encryption_sdk.internal.crypto.elliptic_curve import generate_ecc_signing_key | ||
from aws_encryption_sdk.internal.defaults import ALGORITHM, ALGORITHM_COMMIT_KEY, ENCODED_SIGNER_KEY | ||
from aws_encryption_sdk.internal.str_ops import to_str | ||
from aws_encryption_sdk.internal.utils import prepare_data_keys | ||
from aws_encryption_sdk.internal.utils.commitment import ( | ||
validate_commitment_policy_on_decrypt, | ||
validate_commitment_policy_on_encrypt, | ||
) | ||
from aws_encryption_sdk.key_providers.base import MasterKeyProvider | ||
from aws_encryption_sdk.materials_managers import DecryptionMaterials, EncryptionMaterials | ||
from aws_encryption_sdk.materials_managers.base import CryptoMaterialsManager | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
@attr.s(hash=False) | ||
class V3DefaultCryptoMaterialsManager(CryptoMaterialsManager): | ||
"""Copy of the default crypto material manager for ESDK V3. | ||
|
||
This is a copy-paste of the DefaultCryptoMaterialsManager implementation | ||
from the V3 ESDK commit: 98b5eb7c2bd7d1b2a3380aacfa508e8721c4d8a9 | ||
This CMM is used to explicitly assert that the V3 implementation of | ||
the DefaultCMM is compatible with future version's logic, | ||
which implicitly asserts that custom implementations of V3-compatible CMMs | ||
are also compatible with future version's logic. | ||
|
||
:param master_key_provider: Master key provider to use | ||
:type master_key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider | ||
""" | ||
|
||
master_key_provider = attr.ib(validator=attr.validators.instance_of(MasterKeyProvider)) | ||
|
||
# pylint: disable=no-self-use | ||
def _generate_signing_key_and_update_encryption_context(self, algorithm, encryption_context): | ||
"""Generates a signing key based on the provided algorithm. | ||
|
||
:param algorithm: Algorithm for which to generate signing key | ||
:type algorithm: aws_encryption_sdk.identifiers.Algorithm | ||
:param dict encryption_context: Encryption context from request | ||
:returns: Signing key bytes | ||
:rtype: bytes or None | ||
""" | ||
_LOGGER.debug("Generating signing key") | ||
if algorithm.signing_algorithm_info is None: | ||
return None | ||
|
||
signer = Signer(algorithm=algorithm, key=generate_ecc_signing_key(algorithm=algorithm)) | ||
encryption_context[ENCODED_SIGNER_KEY] = to_str(signer.encoded_public_key()) | ||
return signer.key_bytes() | ||
|
||
def get_encryption_materials(self, request): | ||
"""Creates encryption materials using underlying master key provider. | ||
|
||
:param request: encryption materials request | ||
:type request: aws_encryption_sdk.materials_managers.EncryptionMaterialsRequest | ||
:returns: encryption materials | ||
:rtype: aws_encryption_sdk.materials_managers.EncryptionMaterials | ||
:raises MasterKeyProviderError: if no master keys are available from the underlying master key provider | ||
:raises MasterKeyProviderError: if the primary master key provided by the underlying master key provider | ||
is not included in the full set of master keys provided by that provider | ||
:raises ActionNotAllowedError: if the commitment policy in the request is violated by the algorithm being | ||
used | ||
""" | ||
default_algorithm = ALGORITHM | ||
if request.commitment_policy in ( | ||
CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT, | ||
CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT, | ||
): | ||
default_algorithm = ALGORITHM_COMMIT_KEY | ||
algorithm = request.algorithm if request.algorithm is not None else default_algorithm | ||
|
||
validate_commitment_policy_on_encrypt(request.commitment_policy, request.algorithm) | ||
|
||
encryption_context = request.encryption_context.copy() | ||
|
||
signing_key = self._generate_signing_key_and_update_encryption_context(algorithm, encryption_context) | ||
|
||
primary_master_key, master_keys = self.master_key_provider.master_keys_for_encryption( | ||
encryption_context=encryption_context, | ||
plaintext_rostream=request.plaintext_rostream, | ||
plaintext_length=request.plaintext_length, | ||
) | ||
if not master_keys: | ||
raise MasterKeyProviderError("No Master Keys available from Master Key Provider") | ||
if primary_master_key not in master_keys: | ||
raise MasterKeyProviderError("Primary Master Key not in provided Master Keys") | ||
|
||
data_encryption_key, encrypted_data_keys = prepare_data_keys( | ||
primary_master_key=primary_master_key, | ||
master_keys=master_keys, | ||
algorithm=algorithm, | ||
encryption_context=encryption_context, | ||
) | ||
|
||
_LOGGER.debug("Post-encrypt encryption context: %s", encryption_context) | ||
|
||
return EncryptionMaterials( | ||
algorithm=algorithm, | ||
data_encryption_key=data_encryption_key, | ||
encrypted_data_keys=encrypted_data_keys, | ||
encryption_context=encryption_context, | ||
signing_key=signing_key, | ||
) | ||
|
||
# pylint: disable=no-self-use | ||
def _load_verification_key_from_encryption_context(self, algorithm, encryption_context): | ||
"""Loads the verification key from the encryption context if used by algorithm suite. | ||
|
||
:param algorithm: Algorithm for which to generate signing key | ||
:type algorithm: aws_encryption_sdk.identifiers.Algorithm | ||
:param dict encryption_context: Encryption context from request | ||
:returns: Raw verification key | ||
:rtype: bytes | ||
:raises SerializationError: if algorithm suite requires message signing and no verification key is found | ||
""" | ||
encoded_verification_key = encryption_context.get(ENCODED_SIGNER_KEY, None) | ||
|
||
if algorithm.signing_algorithm_info is not None and encoded_verification_key is None: | ||
raise SerializationError("No signature verification key found in header for signed algorithm.") | ||
|
||
if algorithm.signing_algorithm_info is None: | ||
if encoded_verification_key is not None: | ||
raise SerializationError("Signature verification key found in header for non-signed algorithm.") | ||
return None | ||
|
||
verifier = Verifier.from_encoded_point(algorithm=algorithm, encoded_point=encoded_verification_key) | ||
return verifier.key_bytes() | ||
|
||
def decrypt_materials(self, request): | ||
"""Obtains a plaintext data key from one or more encrypted data keys | ||
using underlying master key provider. | ||
|
||
:param request: decrypt materials request | ||
:type request: aws_encryption_sdk.materials_managers.DecryptionMaterialsRequest | ||
:returns: decryption materials | ||
:rtype: aws_encryption_sdk.materials_managers.DecryptionMaterials | ||
""" | ||
validate_commitment_policy_on_decrypt(request.commitment_policy, request.algorithm) | ||
|
||
data_key = self.master_key_provider.decrypt_data_key_from_list( | ||
encrypted_data_keys=request.encrypted_data_keys, | ||
algorithm=request.algorithm, | ||
encryption_context=request.encryption_context, | ||
) | ||
verification_key = self._load_verification_key_from_encryption_context( | ||
algorithm=request.algorithm, encryption_context=request.encryption_context | ||
) | ||
|
||
return DecryptionMaterials(data_key=data_key, verification_key=verification_key) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.