Skip to content

chore(performance_tests): added hierarchy keyring and caching cmm tests #686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Performance tests for the hierarchy keyring."""

# noqa pylint: disable=wrong-import-order
from typing import Dict

import aws_encryption_sdk
import boto3
from aws_cryptographic_materialproviders.keystore import KeyStore
from aws_cryptographic_materialproviders.keystore.config import KeyStoreConfig
from aws_cryptographic_materialproviders.keystore.models import KMSConfigurationKmsKeyArn
from aws_cryptographic_materialproviders.mpl import AwsCryptographicMaterialProviders
from aws_cryptographic_materialproviders.mpl.config import MaterialProvidersConfig
from aws_cryptographic_materialproviders.mpl.models import (
CacheTypeDefault,
CreateAwsKmsHierarchicalKeyringInput,
DefaultCache,
GetBranchKeyIdInput,
GetBranchKeyIdOutput,
)
from aws_cryptographic_materialproviders.mpl.references import IBranchKeyIdSupplier, IKeyring

from ..utils.util import PerfTestUtils


class ExampleBranchKeyIdSupplier(IBranchKeyIdSupplier):
"""Example implementation of a branch key ID supplier."""

branch_key_id_for_tenant_a: str
branch_key_id_for_tenant_b: str

def __init__(self, tenant_1_id, tenant_2_id):
"""Example constructor for a branch key ID supplier."""
self.branch_key_id_for_tenant_a = tenant_1_id
self.branch_key_id_for_tenant_b = tenant_2_id

def get_branch_key_id(
self,
param: GetBranchKeyIdInput
) -> GetBranchKeyIdOutput:
"""Returns branch key ID from the tenant ID in input's encryption context."""
encryption_context: Dict[str, str] = param.encryption_context

if b"tenant" not in encryption_context:
raise ValueError("EncryptionContext invalid, does not contain expected tenant key value pair.")

tenant_key_id: str = encryption_context.get(b"tenant")
branch_key_id: str

if tenant_key_id == b"TenantA":
branch_key_id = self.branch_key_id_for_tenant_a
elif tenant_key_id == b"TenantB":
branch_key_id = self.branch_key_id_for_tenant_b
else:
raise ValueError(f"Item does not contain valid tenant ID: {tenant_key_id=}")

return GetBranchKeyIdOutput(branch_key_id=branch_key_id)


def create_keyring(
key_store_table_name: str,
logical_key_store_name: str,
kms_key_id: str
):
"""Demonstrate how to create a hierarchy keyring.

Usage: create_keyring(key_store_table_name, logical_key_store_name, kms_key_id)
:param key_store_table_name: Name of the KeyStore DynamoDB table.
:type key_store_table_name: string
:param logical_key_store_name: Logical name of the KeyStore.
:type logical_key_store_name: string
:param kms_key_id: KMS Key identifier for the KMS key you want to use.
:type kms_key_id: string

For more information on KMS Key identifiers, see
https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#key-id
"""
# Create boto3 clients for DynamoDB and KMS.
ddb_client = boto3.client('dynamodb', region_name="us-west-2")
kms_client = boto3.client('kms', region_name="us-west-2")

# Configure your KeyStore resource.
# This SHOULD be the same configuration that you used
# to initially create and populate your KeyStore.
keystore: KeyStore = KeyStore(
config=KeyStoreConfig(
ddb_client=ddb_client,
ddb_table_name=key_store_table_name,
logical_key_store_name=logical_key_store_name,
kms_client=kms_client,
kms_configuration=KMSConfigurationKmsKeyArn(
value=kms_key_id
),
)
)

# Call CreateKey to create two new active branch keys
branch_key_id_a: str = PerfTestUtils.DEFAULT_BRANCH_KEY_ID_A
branch_key_id_b: str = PerfTestUtils.DEFAULT_BRANCH_KEY_ID_B

# Create a branch key supplier that maps the branch key id to a more readable format
branch_key_id_supplier: IBranchKeyIdSupplier = ExampleBranchKeyIdSupplier(
tenant_1_id=branch_key_id_a,
tenant_2_id=branch_key_id_b,
)

# Create the Hierarchical Keyring.
mat_prov: AwsCryptographicMaterialProviders = AwsCryptographicMaterialProviders(
config=MaterialProvidersConfig()
)

keyring_input: CreateAwsKmsHierarchicalKeyringInput = CreateAwsKmsHierarchicalKeyringInput(
key_store=keystore,
branch_key_id_supplier=branch_key_id_supplier,
ttl_seconds=600,
cache=CacheTypeDefault(
value=DefaultCache(
entry_capacity=100
)
),
)

keyring: IKeyring = mat_prov.create_aws_kms_hierarchical_keyring(
input=keyring_input
)

return keyring


def encrypt_using_keyring(
plaintext_data: bytes,
keyring: IKeyring
):
"""Demonstrate how to encrypt plaintext data using a hierarchy keyring.

Usage: encrypt_using_keyring(plaintext_data, keyring)
:param plaintext_data: plaintext data you want to encrypt
:type: bytes
:param keyring: Keyring to use for encryption.
:type keyring: IKeyring
"""
client = aws_encryption_sdk.EncryptionSDKClient()

ciphertext_data, _ = client.encrypt(
source=plaintext_data,
keyring=keyring,
encryption_context=PerfTestUtils.DEFAULT_ENCRYPTION_CONTEXT
)

return ciphertext_data


def decrypt_using_keyring(
ciphertext_data: bytes,
keyring: IKeyring
):
"""Demonstrate how to decrypt ciphertext data using a hierarchy keyring.

Usage: decrypt_using_keyring(ciphertext_data, keyring)
:param ciphertext_data: ciphertext data you want to decrypt
:type: bytes
:param keyring: Keyring to use for decryption.
:type keyring: IKeyring
"""
client = aws_encryption_sdk.EncryptionSDKClient()

decrypted_plaintext_data, _ = client.decrypt(
source=ciphertext_data,
keyring=keyring,
encryption_context=PerfTestUtils.DEFAULT_ENCRYPTION_CONTEXT
)

return decrypted_plaintext_data
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def create_keyring():
key_name_space = "Some managed raw keys"
key_name = "My 256-bit AES wrapping key"

# Here, the input to secrets.token_bytes() = 32 bytes = 256 bits
# We fix the static key in order to make the test deterministic
static_key = PerfTestUtils.DEFAULT_AES_256_STATIC_KEY

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
def create_key_provider(
kms_key_id: str
):
"""Demonstrate how to create an AWS KMS master key-provider.
"""Demonstrate how to create an AWS KMS master key provider.

Usage: create_key_provider(kms_key_id)
:param kms_key_id: KMS Key identifier for the KMS key you want to use.
Expand All @@ -17,7 +17,7 @@ def create_key_provider(
For more information on KMS Key identifiers, see
https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#key-id
"""
# Create a KMS master key-provider.
# Create a KMS master key provider.
key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[
kms_key_id,
])
Expand All @@ -29,7 +29,7 @@ def encrypt_using_key_provider(
plaintext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to encrypt plaintext data using an AWS KMS master key-provider.
"""Demonstrate how to encrypt plaintext data using an AWS KMS master key provider.

Usage: encrypt_using_key_provider(plaintext_data, key_provider)
:param plaintext_data: plaintext data you want to encrypt
Expand All @@ -51,7 +51,7 @@ def decrypt_using_key_provider(
ciphertext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to decrypt ciphertext data using an AWS KMS master key-provider.
"""Demonstrate how to decrypt ciphertext data using an AWS KMS master key provider.

Usage: decrypt_using_key_provider(ciphertext_data, key_provider)
:param ciphertext_data: ciphertext data you want to decrypt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Performance tests for the Caching Cryptographic Materials Manager (CMM)."""

import aws_encryption_sdk


def create_cmm(
kms_key_id: str,
max_age_in_cache: float,
cache_capacity: int
):
"""Demonstrate how to create a Caching CMM.

Usage: create_cmm(kms_key_id, max_age_in_cache, cache_capacity)
:param kms_key_id: Amazon Resource Name (ARN) of the KMS customer master key
:type kms_key_id: str
:param max_age_in_cache: Maximum time in seconds that a cached entry can be used
:type max_age_in_cache: float
:param cache_capacity: Maximum number of entries to retain in cache at once
:type cache_capacity: int
"""
# Security thresholds
# Max messages (or max bytes per) data key are optional
max_entry_messages = 100

# Create a master key provider for the KMS customer master key (CMK)
key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[kms_key_id])

# Create a local cache
cache = aws_encryption_sdk.LocalCryptoMaterialsCache(cache_capacity)

# Create a caching CMM
caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager(
master_key_provider=key_provider,
cache=cache,
max_age=max_age_in_cache,
max_messages_encrypted=max_entry_messages,
)

return caching_cmm


def encrypt_using_cmm(
plaintext_data: bytes,
caching_cmm: aws_encryption_sdk.materials_managers.base.CryptoMaterialsManager
):
"""Demonstrate how to encrypt plaintext data using a Caching CMM.

Usage: encrypt_using_cmm(plaintext_data, caching_cmm)
:param plaintext_data: plaintext data you want to encrypt
:type: bytes
:param caching_cmm: Crypto Materials Manager to use for encryption.
:type caching_cmm: aws_encryption_sdk.materials_managers.base.CryptoMaterialsManager
"""
client = aws_encryption_sdk.EncryptionSDKClient()

ciphertext_data, _ = client.encrypt(
source=plaintext_data,
materials_manager=caching_cmm
)

return ciphertext_data


def decrypt_using_cmm(
ciphertext_data: bytes,
caching_cmm: aws_encryption_sdk.materials_managers.base.CryptoMaterialsManager
):
"""Demonstrate how to decrypt ciphertext data using a Caching CMM.

Usage: decrypt_using_cmm(ciphertext_data, caching_cmm)
:param ciphertext_data: ciphertext data you want to decrypt
:type: bytes
:param caching_cmm: Crypto Materials Manager to use for encryption.
:type caching_cmm: aws_encryption_sdk.materials_managers.base.CryptoMaterialsManager
"""
client = aws_encryption_sdk.EncryptionSDKClient()

decrypted_plaintext_data, _ = client.decrypt(
source=ciphertext_data,
materials_manager=caching_cmm
)

return decrypted_plaintext_data
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def _get_raw_key(self, key_id):


def create_key_provider():
"""Demonstrate how to create a Raw AES master key-provider.
"""Demonstrate how to create a Raw AES master key provider.

Usage: create_key_provider()
"""
# Create a Raw AES master key-provider.
# Create a Raw AES master key provider.

# The Key ID field in the JceMasterKey and RawMasterKey is equivalent to key name in the Raw keyrings
# The Key ID field in the JceMasterKey and RawMasterKey is equivalent to key name in the Raw keyrings
key_id = "My 256-bit AES wrapping key"
key_provider = StaticRandomMasterKeyProvider()
key_provider.add_master_key(key_id)
Expand All @@ -61,7 +61,7 @@ def encrypt_using_key_provider(
plaintext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to encrypt plaintext data using a Raw AES master key-provider.
"""Demonstrate how to encrypt plaintext data using a Raw AES master key provider.

Usage: encrypt_using_key_provider(plaintext_data, key_provider)
:param plaintext_data: plaintext data you want to encrypt
Expand All @@ -83,7 +83,7 @@ def decrypt_using_key_provider(
ciphertext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to decrypt ciphertext data using a Raw AES master key-provider.
"""Demonstrate how to decrypt ciphertext data using a Raw AES master key provider.

Usage: decrypt_using_key_provider(ciphertext_data, key_provider)
:param ciphertext_data: ciphertext data you want to decrypt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def _get_raw_key(self, key_id):


def create_key_provider():
"""Demonstrate how to create a Raw RSA master key-provider.
"""Demonstrate how to create a Raw RSA master key provider.

Usage: create_key_provider()
"""
# Create a Raw RSA master key-provider.
# Create a Raw RSA master key provider.

# The Key ID field in the JceMasterKey and RawMasterKey is equivalent to key name in the Raw keyrings
# The Key ID field in the JceMasterKey and RawMasterKey is equivalent to key name in the Raw keyrings
key_id = "My 4096-bit RSA wrapping key"
key_provider = StaticRandomMasterKeyProvider()
key_provider.add_master_key(key_id)
Expand All @@ -61,7 +61,7 @@ def encrypt_using_key_provider(
plaintext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to encrypt plaintext data using a Raw RSA master key-provider.
"""Demonstrate how to encrypt plaintext data using a Raw RSA master key provider.

Usage: encrypt_using_key_provider(plaintext_data, key_provider)
:param plaintext_data: plaintext data you want to encrypt
Expand All @@ -83,7 +83,7 @@ def decrypt_using_key_provider(
ciphertext_data: bytes,
key_provider: aws_encryption_sdk.key_providers.base.MasterKeyProvider
):
"""Demonstrate how to decrypt ciphertext data using a Raw RSA master key-provider.
"""Demonstrate how to decrypt ciphertext data using a Raw RSA master key provider.

Usage: decrypt_using_key_provider(ciphertext_data, key_provider)
:param ciphertext_data: ciphertext data you want to decrypt
Expand Down
Loading
Loading