1
1
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2
2
# SPDX-License-Identifier: Apache-2.0
3
3
import copy
4
- import random
5
- import secrets
6
- import string
7
4
8
- import boto3
9
5
import pytest
10
- from aws_cryptographic_materialproviders .keystore import KeyStore
11
- from aws_cryptographic_materialproviders .keystore .config import KeyStoreConfig
12
- from aws_cryptographic_materialproviders .keystore .models import KMSConfigurationKmsKeyArn
13
6
from aws_cryptographic_materialproviders .mpl import AwsCryptographicMaterialProviders
14
7
from aws_cryptographic_materialproviders .mpl .config import MaterialProvidersConfig
15
8
from aws_cryptographic_materialproviders .mpl .models import (
16
- AesWrappingAlg ,
17
- CacheTypeDefault ,
18
- CreateAwsKmsHierarchicalKeyringInput ,
19
- CreateAwsKmsKeyringInput ,
20
9
CreateDefaultCryptographicMaterialsManagerInput ,
21
- CreateMultiKeyringInput ,
22
- CreateRawAesKeyringInput ,
23
- CreateRawRsaKeyringInput ,
24
10
CreateRequiredEncryptionContextCMMInput ,
25
- DefaultCache ,
26
- PaddingScheme ,
27
11
)
28
- from aws_cryptographic_materialproviders .mpl .references import IKeyring
29
- from cryptography .hazmat .backends import default_backend as crypto_default_backend
30
- from cryptography .hazmat .primitives import serialization as crypto_serialization
31
- from cryptography .hazmat .primitives .asymmetric import rsa
32
12
33
13
import aws_encryption_sdk
34
14
from aws_encryption_sdk .exceptions import AWSEncryptionSDKClientError
35
15
16
+ from ..utils import TestEncryptionContexts , TestKeyringCreator
17
+
36
18
pytestmark = [pytest .mark .integ ]
37
19
38
20
# ESDK client. Used to encrypt/decrypt in each test.
44
26
)
45
27
46
28
47
- # Private raw AES keyring creator.
48
- # Lifted from raw AES keyring example.
49
- def _create_raw_aes_keyring ():
50
- static_key = secrets .token_bytes (32 )
51
-
52
- keyring_input : CreateRawAesKeyringInput = CreateRawAesKeyringInput (
53
- key_namespace = "some_key_namespace" ,
54
- key_name = "some_key_name" ,
55
- wrapping_key = static_key ,
56
- wrapping_alg = AesWrappingAlg .ALG_AES256_GCM_IV12_TAG16
57
- )
58
-
59
- raw_aes_keyring : IKeyring = mpl_client .create_raw_aes_keyring (
60
- input = keyring_input
61
- )
62
-
63
- return raw_aes_keyring
64
-
65
-
66
- # Private raw RSA keyring creator.
67
- # Lifted from raw RSA keyring example.
68
- def _create_raw_rsa_keyring ():
69
- ssh_rsa_exponent = 65537
70
- bit_strength = 4096
71
- key = rsa .generate_private_key (
72
- backend = crypto_default_backend (),
73
- public_exponent = ssh_rsa_exponent ,
74
- key_size = bit_strength
75
- )
76
-
77
- public_key = key .public_key ().public_bytes (
78
- encoding = crypto_serialization .Encoding .PEM ,
79
- format = crypto_serialization .PublicFormat .SubjectPublicKeyInfo
80
- )
81
- private_key = key .private_bytes (
82
- encoding = crypto_serialization .Encoding .PEM ,
83
- format = crypto_serialization .PrivateFormat .TraditionalOpenSSL ,
84
- encryption_algorithm = crypto_serialization .NoEncryption ()
85
- )
86
-
87
- key_name_space = "some_key_name_space"
88
- key_name = "some_key_name"
89
-
90
- keyring_input : CreateRawRsaKeyringInput = CreateRawRsaKeyringInput (
91
- key_namespace = key_name_space ,
92
- key_name = key_name ,
93
- padding_scheme = PaddingScheme .OAEP_SHA256_MGF1 ,
94
- public_key = public_key ,
95
- private_key = private_key
96
- )
97
-
98
- raw_rsa_keyring : IKeyring = mpl_client .create_raw_rsa_keyring (
99
- input = keyring_input
100
- )
101
-
102
- return raw_rsa_keyring
103
-
104
-
105
- # Private KMS keyring creator.
106
- # Lifted KMS keyring example.
107
- def _create_kms_keyring ():
108
- kms_client = boto3 .client ('kms' , region_name = "us-west-2" )
109
- keyring_input : CreateAwsKmsKeyringInput = CreateAwsKmsKeyringInput (
110
- kms_key_id = "arn:aws:kms:us-west-2:658956600833:key/b3537ef1-d8dc-4780-9f5a-55776cbb2f7f" ,
111
- kms_client = kms_client
112
- )
113
-
114
- kms_keyring : IKeyring = mpl_client .create_aws_kms_keyring (
115
- input = keyring_input
116
- )
117
-
118
- return kms_keyring
119
-
120
-
121
- # Private hierarchical keyring creator.
122
- # Lifted hierarchical keyring example.
123
- def _create_hierarchical_keyring ():
124
- kms_client = boto3 .client ('kms' , region_name = "us-west-2" )
125
- ddb_client = boto3 .client ('dynamodb' , region_name = "us-west-2" )
126
-
127
- keystore : KeyStore = KeyStore (
128
- config = KeyStoreConfig (
129
- ddb_client = ddb_client ,
130
- ddb_table_name = "KeyStoreDdbTable" ,
131
- logical_key_store_name = "KeyStoreDdbTable" ,
132
- kms_client = kms_client ,
133
- kms_configuration = KMSConfigurationKmsKeyArn (
134
- value = 'arn:aws:kms:us-west-2:370957321024:key/9d989aa2-2f9c-438c-a745-cc57d3ad0126'
135
- ),
136
- )
137
- )
138
-
139
- keyring_input : CreateAwsKmsHierarchicalKeyringInput = CreateAwsKmsHierarchicalKeyringInput (
140
- key_store = keystore ,
141
- branch_key_id = 'a52dfaad-7dbd-4430-a1fd-abaa5299da07' ,
142
- ttl_seconds = 600 ,
143
- cache = CacheTypeDefault (
144
- value = DefaultCache (
145
- entry_capacity = 100
146
- )
147
- ),
148
- )
149
-
150
- hierarchical_keyring : IKeyring = mpl_client .create_aws_kms_hierarchical_keyring (
151
- input = keyring_input
152
- )
153
-
154
- return hierarchical_keyring
155
-
156
-
157
- # Private multi-keyring creator.
158
- def _create_multi_keyring (keyrings ):
159
- a = mpl_client .create_multi_keyring (CreateMultiKeyringInput (
160
- generator = keyrings [0 ],
161
- child_keyrings = keyrings [1 :]
162
- ))
163
- return a
164
-
165
-
166
- # Encryption contexts under test
167
- SOME_EMPTY_ENCRYPTION_CONTEXT = {}
168
- SOME_SINGLE_ITEM_ENCRYPTION_CONTEXT = {"some_key" : "some_value" }
169
- SOME_DOUBLE_ITEM_ENCRYPTION_CONTEXT = {"some_key" : "some_value" , "some_other_key" : "some_other_value" }
170
- SOME_MANY_ITEM_ENCRYPTION_CONTEXT = {
171
- '' .join (random .choices (string .ascii_letters , k = 6 ))
172
- : '' .join (random .choices (string .ascii_letters , k = 6 )) for _ in range (20 )
173
- }
174
- ENCRYPTION_CONTEXT_SUITE = [
175
- SOME_EMPTY_ENCRYPTION_CONTEXT ,
176
- SOME_SINGLE_ITEM_ENCRYPTION_CONTEXT ,
177
- SOME_DOUBLE_ITEM_ENCRYPTION_CONTEXT ,
178
- SOME_MANY_ITEM_ENCRYPTION_CONTEXT ,
179
- ]
180
-
181
29
# Keyrings under test
182
- SOME_RSA_KEYRING = _create_raw_rsa_keyring ()
183
- SOME_AES_KEYRING = _create_raw_aes_keyring ()
184
- SOME_KMS_KEYRING = _create_kms_keyring ()
185
- SOME_HIERARCHICAL_KEYRING = _create_hierarchical_keyring ()
30
+ SOME_RSA_KEYRING = TestKeyringCreator . _create_raw_rsa_keyring ()
31
+ SOME_AES_KEYRING = TestKeyringCreator . _create_raw_aes_keyring ()
32
+ SOME_KMS_KEYRING = TestKeyringCreator . _create_kms_keyring ()
33
+ SOME_HIERARCHICAL_KEYRING = TestKeyringCreator . _create_hierarchical_keyring ()
186
34
TEST_KEYRINGS_LIST = [
187
35
SOME_AES_KEYRING ,
188
36
SOME_KMS_KEYRING ,
@@ -194,13 +42,13 @@ def _create_multi_keyring(keyrings):
194
42
# this multi keyring encrypts one message,
195
43
# then the test attempts decryption with all of its component keyrings
196
44
# ("component" = generator + child keyrings).
197
- SOME_MULTI_KEYRING = _create_multi_keyring (TEST_KEYRINGS_LIST )
45
+ SOME_MULTI_KEYRING = TestKeyringCreator . _create_multi_keyring (TEST_KEYRINGS_LIST )
198
46
199
47
200
48
SOME_PLAINTEXT = b"Hello World"
201
49
202
50
203
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
51
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts . ALL_ENCRYPTION_CONTEXTS )
204
52
# HAPPY CASE 1
205
53
# Test supply same encryption context on encrypt and decrypt NO filtering
206
54
def test_GIVEN_same_EC_on_encrypt_and_decrypt_WHEN_encrypt_decrypt_cycle_THEN_decrypt_matches_plaintext (
@@ -226,18 +74,15 @@ def test_GIVEN_same_EC_on_encrypt_and_decrypt_WHEN_encrypt_decrypt_cycle_THEN_de
226
74
assert pt == SOME_PLAINTEXT
227
75
228
76
229
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
77
+ # This test needs >=1 item to supply as required encryption context
78
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .NONEMPTY_ENCRYPTION_CONTEXTS )
230
79
# HAPPY CASE 2
231
80
# On Encrypt we will only write one encryption context key value to the header
232
81
# we will then supply only what we didn't write wth no required ec cmm,
233
82
# This test case is checking that the default cmm is doing the correct filtering
234
83
def test_GIVEN_RECCMM_with_one_REC_key_on_encrypt_AND_default_CMM_with_valid_reproduced_EC_on_decrypt_WHEN_supply_reproduced_EC_with_REC_key_on_decrypt_THEN_decrypt_matches_plaintext ( # noqa pylint: disable=line-too-long
235
84
encryption_context
236
85
):
237
- # This test needs >1 item to supply as required encryption context
238
- if len (encryption_context ) < 1 :
239
- return
240
-
241
86
# Grab one item from encryption_context to supply as reproduced EC
242
87
one_k , one_v = next (iter (encryption_context .items ()))
243
88
reproduced_ec = {one_k : one_v }
@@ -280,18 +125,15 @@ def test_GIVEN_RECCMM_with_one_REC_key_on_encrypt_AND_default_CMM_with_valid_rep
280
125
assert pt == SOME_PLAINTEXT
281
126
282
127
283
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
128
+ # This test needs >=1 item to supply as required encryption context
129
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .NONEMPTY_ENCRYPTION_CONTEXTS )
284
130
# HAPPY CASE 3
285
131
# On Encrypt we will only write one encryption context key value to the header
286
132
# we will then supply only what we didn't write but included in the signature while we
287
133
# are configured with the required encryption context cmm
288
134
def test_GIVEN_RECCMM_with_one_REC_key_on_encrypt_AND_RECCMM_with_valid_reproduced_EC_on_decrypt_WHEN_supply_reproduced_EC_with_REC_key_on_decrypt_THEN_decrypt_matches_plaintext ( # noqa pylint: disable=line-too-long
289
135
encryption_context
290
136
):
291
- # This test needs >1 item to supply as required encryption context
292
- if len (encryption_context ) < 1 :
293
- return
294
-
295
137
# Grab one item from encryption_context to supply as reproduced EC
296
138
one_k , one_v = next (iter (encryption_context .items ()))
297
139
reproduced_ec = {one_k : one_v }
@@ -349,7 +191,8 @@ def test_GIVEN_RECCMM_with_one_REC_key_on_encrypt_AND_RECCMM_with_valid_reproduc
349
191
assert pt == SOME_PLAINTEXT
350
192
351
193
352
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
194
+ # This test needs >=1 item to supply as required encryption context
195
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .NONEMPTY_ENCRYPTION_CONTEXTS )
353
196
# HAPPY CASE 4
354
197
# On Encrypt we write all encryption context
355
198
# as if the message was encrypted before the feature existed.
@@ -358,10 +201,6 @@ def test_GIVEN_RECCMM_with_one_REC_key_on_encrypt_AND_RECCMM_with_valid_reproduc
358
201
def test_GIVEN_default_CMM_on_encrypt_AND_default_CMM_with_valid_reproduced_EC_on_decrypt_WHEN_supply_reproduced_EC_with_REC_key_on_decrypt_THEN_decrypt_matches_plaintext ( # noqa pylint: disable=line-too-long
359
202
encryption_context
360
203
):
361
- # This test needs >1 item to supply as required encryption context
362
- if len (encryption_context ) < 1 :
363
- return
364
-
365
204
# Grab one item from encryption_context to supply as reproduced EC
366
205
one_k , one_v = next (iter (encryption_context .items ()))
367
206
reproduced_ec = {one_k : one_v }
@@ -408,20 +247,15 @@ def test_GIVEN_default_CMM_on_encrypt_AND_default_CMM_with_valid_reproduced_EC_o
408
247
# Then: decrypted plaintext matches original plaintext
409
248
assert pt == SOME_PLAINTEXT
410
249
411
-
412
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
250
+ # This test needs >=2 items in EC so it can swap their key/value pairs
251
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts . AT_LEAST_TWO_ITEMS_ENCRYPTION_CONTEXTS )
413
252
# FAILURE CASE 1
414
253
# Encrypt with and store all encryption context in header
415
254
# On Decrypt supply additional encryption context not stored in the header; this MUST fail
416
255
# On Decrypt supply mismatched encryption context key values; this MUST fail
417
256
def test_GIVEN_default_CMM_with_EC_on_encrypt_AND_default_CMM_with_different_EC_on_decrypt_WHEN_decrypt_THEN_raise_AWSEncryptionSDKClientError ( # noqa pylint: disable=line-too-long
418
257
encryption_context ,
419
258
):
420
- # This test swaps EC key/value pairs around;
421
- # if there isn't a pair to swap, skip this test
422
- if len (encryption_context ) < 2 :
423
- return
424
-
425
259
ct , _ = client .encrypt (
426
260
source = SOME_PLAINTEXT ,
427
261
keyring = SOME_MULTI_KEYRING ,
@@ -463,7 +297,8 @@ def test_GIVEN_default_CMM_with_EC_on_encrypt_AND_default_CMM_with_different_EC_
463
297
)
464
298
465
299
466
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
300
+ # This test needs >=1 item to supply as required encryption context
301
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .NONEMPTY_ENCRYPTION_CONTEXTS )
467
302
# FAILURE CASE 2
468
303
# Encrypt will not store all Encryption Context, we will drop one entry but it will still get
469
304
# included in the
@@ -472,10 +307,6 @@ def test_GIVEN_default_CMM_with_EC_on_encrypt_AND_default_CMM_with_different_EC_
472
307
def test_GIVEN_RECCCMM_with_one_REC_key_on_encrypt_AND_default_CMM_with_no_EC_on_decrypt_WHEN_decrypt_THEN_raise_AWSEncryptionSDKClientError ( # noqa pylint: disable=line-too-long
473
308
encryption_context ,
474
309
):
475
- # This test needs >1 item to supply as required encryption context
476
- if len (encryption_context ) < 1 :
477
- return
478
-
479
310
# Grab one item from encryption_context to supply as reproduced EC
480
311
one_k , _ = next (iter (encryption_context .items ()))
481
312
# Given: one required encryption context (REC) key
@@ -512,7 +343,8 @@ def test_GIVEN_RECCCMM_with_one_REC_key_on_encrypt_AND_default_CMM_with_no_EC_on
512
343
)
513
344
514
345
515
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
346
+ # This test needs >=1 item to supply as required encryption context
347
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .NONEMPTY_ENCRYPTION_CONTEXTS )
516
348
# FAILURE CASE 3
517
349
# Encrypt will not store all Encryption Context, we will drop one entry but it will still get
518
350
# included in the
@@ -521,10 +353,6 @@ def test_GIVEN_RECCCMM_with_one_REC_key_on_encrypt_AND_default_CMM_with_no_EC_on
521
353
def test_GIVEN_RECCMM_on_encrypt_AND_EC_with_wrong_value_for_key_on_decrypt_WHEN_decrypt_THEN_raise_AWSEncryptionSDKClientError ( # noqa pylint: disable=line-too-long
522
354
encryption_context ,
523
355
):
524
- # This test needs >1 item to supply as required encryption context
525
- if len (encryption_context ) < 1 :
526
- return
527
-
528
356
# Grab one item from encryption_context to supply as reproduced EC
529
357
one_k , _ = next (iter (encryption_context .items ()))
530
358
# Given: one required encryption context (REC) key
@@ -564,7 +392,8 @@ def test_GIVEN_RECCMM_on_encrypt_AND_EC_with_wrong_value_for_key_on_decrypt_WHEN
564
392
)
565
393
566
394
567
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
395
+ # This test needs >=2 items in EC so it create incorrect reproduced EC scenarios
396
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .AT_LEAST_TWO_ITEMS_ENCRYPTION_CONTEXTS )
568
397
# FAILURE CASE 4
569
398
# Encrypt will not store all Encryption Context, we will drop one entry but it will still get
570
399
# included in the
@@ -573,10 +402,6 @@ def test_GIVEN_RECCMM_on_encrypt_AND_EC_with_wrong_value_for_key_on_decrypt_WHEN
573
402
def test_GIVEN_RECCMM_on_encrypt_AND_reproduced_EC_missing_REC_key_on_decrypt_WHEN_decrypt_THEN_raise_AWSEncryptionSDKClientError ( # noqa pylint: disable=line-too-long
574
403
encryption_context ,
575
404
):
576
- # This test needs two EC items to have an incorrect reproduced EC
577
- if len (encryption_context ) < 2 :
578
- return
579
-
580
405
# Grab one item from encryption_context to supply as reproduced EC
581
406
ec_iter = iter (encryption_context .items ())
582
407
required_k , _ = next (ec_iter )
@@ -617,7 +442,7 @@ def test_GIVEN_RECCMM_on_encrypt_AND_reproduced_EC_missing_REC_key_on_decrypt_WH
617
442
)
618
443
619
444
620
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
445
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts . ALL_ENCRYPTION_CONTEXTS )
621
446
# FAILURE CASE 5
622
447
# Although we are requesting that we remove a RESERVED key word from the encryption context
623
448
# The CMM instantiation will still succeed because the CMM is meant to work with different
@@ -656,16 +481,13 @@ def test_GIVEN_RECCMM_with_reserved_key_on_encrypt_WHEN_encrypt_THEN_raise_AWSEn
656
481
)
657
482
658
483
659
- @pytest .mark .parametrize ("encryption_context" , ENCRYPTION_CONTEXT_SUITE )
484
+ # This test needs >=2 items in EC so it create invalid reproduced EC scenarios
485
+ @pytest .mark .parametrize ("encryption_context" , TestEncryptionContexts .AT_LEAST_TWO_ITEMS_ENCRYPTION_CONTEXTS )
660
486
@pytest .mark .parametrize ("keyring" , TEST_KEYRINGS_LIST )
661
487
def test_GIVEN_some_keyring_AND_some_EC_WHEN_decrypt_valid_message_with_mutated_EC_THEN_decryption_matches_expected_result ( # noqa pylint: disable=line-too-long
662
488
keyring ,
663
489
encryption_context ,
664
490
):
665
- # This test needs two EC items to have an incorrect reproduced EC
666
- if len (encryption_context ) < 2 :
667
- return
668
-
669
491
# Additional EC
670
492
some_additional_ec = copy .deepcopy (encryption_context )
671
493
some_additional_ec ["some extra key to add" ] = "some extra value added"
0 commit comments