11
11
# ANY KIND, either express or implied. See the License for the specific
12
12
# language governing permissions and limitations under the License.
13
13
"""Utility functions to handle common test framework functions."""
14
+ import base64
14
15
import copy
15
16
import io
16
17
import itertools
17
18
import os
18
19
20
+ import attr
21
+ from attr .validators import instance_of
19
22
from cryptography .hazmat .backends import default_backend
20
23
from cryptography .hazmat .primitives import serialization
21
24
from cryptography .hazmat .primitives .asymmetric import rsa
22
25
23
26
from aws_encryption_sdk .exceptions import DecryptKeyError
24
- from aws_encryption_sdk .identifiers import Algorithm , EncryptionKeyType , KeyringTraceFlag , WrappingAlgorithm
27
+ from aws_encryption_sdk .identifiers import AlgorithmSuite , EncryptionKeyType , KeyringTraceFlag , WrappingAlgorithm
25
28
from aws_encryption_sdk .internal .crypto .wrapping_keys import WrappingKey
26
29
from aws_encryption_sdk .internal .utils .streams import InsistentReaderBytesIO
27
30
from aws_encryption_sdk .key_providers .base import MasterKeyProvider , MasterKeyProviderConfig
33
36
from aws_encryption_sdk .structures import EncryptedDataKey , KeyringTrace , MasterKeyInfo , RawDataKey
34
37
35
38
try : # Python 3.5.0 and 3.5.1 have incompatible typing modules
36
- from typing import Iterable , Optional # noqa pylint: disable=unused-import
39
+ from typing import Dict , Iterable , Optional # noqa pylint: disable=unused-import
37
40
except ImportError : # pragma: no cover
38
41
# We only actually need these imports when running the mypy checks
39
42
pass
@@ -120,7 +123,7 @@ def on_decrypt(self, decryption_materials, encrypted_data_keys):
120
123
121
124
def get_encryption_materials_with_data_key ():
122
125
return EncryptionMaterials (
123
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
126
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
124
127
data_encryption_key = RawDataKey (
125
128
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = _KEY_ID ),
126
129
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -138,7 +141,7 @@ def get_encryption_materials_with_data_key():
138
141
139
142
def get_encryption_materials_with_data_encryption_key ():
140
143
return EncryptionMaterials (
141
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
144
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
142
145
data_encryption_key = RawDataKey (
143
146
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = b"5430b043-5843-4629-869c-64794af77ada" ),
144
147
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -156,15 +159,15 @@ def get_encryption_materials_with_data_encryption_key():
156
159
157
160
def get_encryption_materials_without_data_key ():
158
161
return EncryptionMaterials (
159
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
162
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
160
163
encryption_context = _ENCRYPTION_CONTEXT ,
161
164
signing_key = _SIGNING_KEY ,
162
165
)
163
166
164
167
165
168
def get_encryption_materials_with_encrypted_data_key ():
166
169
return EncryptionMaterials (
167
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
170
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
168
171
data_encryption_key = RawDataKey (
169
172
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = _KEY_ID ),
170
173
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -192,7 +195,7 @@ def get_encryption_materials_with_encrypted_data_key():
192
195
193
196
def get_encryption_materials_with_encrypted_data_key_aes ():
194
197
return EncryptionMaterials (
195
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
198
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
196
199
data_encryption_key = RawDataKey (
197
200
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = _KEY_ID ),
198
201
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -214,23 +217,23 @@ def get_encryption_materials_with_encrypted_data_key_aes():
214
217
215
218
def get_encryption_materials_without_data_encryption_key ():
216
219
return EncryptionMaterials (
217
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
220
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
218
221
encryption_context = _ENCRYPTION_CONTEXT ,
219
222
signing_key = _SIGNING_KEY ,
220
223
)
221
224
222
225
223
226
def get_decryption_materials_without_data_encryption_key ():
224
227
return DecryptionMaterials (
225
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
228
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
226
229
verification_key = b"ex_verification_key" ,
227
230
encryption_context = _ENCRYPTION_CONTEXT ,
228
231
)
229
232
230
233
231
234
def get_decryption_materials_with_data_key ():
232
235
return DecryptionMaterials (
233
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
236
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
234
237
data_encryption_key = RawDataKey (
235
238
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = _KEY_ID ),
236
239
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -248,7 +251,7 @@ def get_decryption_materials_with_data_key():
248
251
249
252
def get_decryption_materials_with_data_encryption_key ():
250
253
return DecryptionMaterials (
251
- algorithm = Algorithm .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
254
+ algorithm = AlgorithmSuite .AES_256_GCM_IV12_TAG16_HKDF_SHA384_ECDSA_P384 ,
252
255
data_encryption_key = RawDataKey (
253
256
key_provider = MasterKeyInfo (provider_id = _PROVIDER_ID , key_info = b"5430b043-5843-4629-869c-64794af77ada" ),
254
257
data_key = b'*!\xa1 "^-(\xf3 \x10 5\x05 i@B\xc2 \xa2 \xb7 \xdd \xd5 \xd5 \xa9 \xdd m\xfa e\xa8 \\ $\xf9 d\x1e (' ,
@@ -509,7 +512,10 @@ class EphemeralRawMasterKeyProvider(RawMasterKeyProvider):
509
512
provider_id = "fake"
510
513
511
514
def __init__ (self ):
512
- self .__keys = {b"aes-256" : ephemeral_raw_aes_master_key (256 ), b"rsa-4096" : ephemeral_raw_rsa_master_key (4096 )}
515
+ self .__keys = {
516
+ b"aes-256" : ephemeral_raw_aes_master_key (WrappingAlgorithm .AES_256_GCM_IV12_TAG16_NO_PADDING ),
517
+ b"rsa-4096" : ephemeral_raw_rsa_master_key (4096 ),
518
+ }
513
519
514
520
def _get_raw_key (self , key_id ):
515
521
return self .__keys [key_id ].config .wrapping_key
@@ -548,3 +554,110 @@ class FailingDecryptMasterKeyProvider(EphemeralRawMasterKeyProvider):
548
554
549
555
def decrypt_data_key (self , encrypted_data_key , algorithm , encryption_context ):
550
556
raise DecryptKeyError ("FailingDecryptMasterKeyProvider cannot decrypt!" )
557
+
558
+
559
+ @attr .s
560
+ class BrokenKeyring (Keyring ):
561
+ """Keyring that wraps another keyring and selectively breaks the returned values."""
562
+
563
+ _inner_keyring = attr .ib (validator = instance_of (Keyring ))
564
+ _break_algorithm = attr .ib (default = False , validator = instance_of (bool ))
565
+ _break_encryption_context = attr .ib (default = False , validator = instance_of (bool ))
566
+ _break_signing = attr .ib (default = False , validator = instance_of (bool ))
567
+
568
+ @staticmethod
569
+ def _random_string (bytes_len ):
570
+ # type: (int) -> str
571
+ return base64 .b64encode (os .urandom (bytes_len )).decode ("utf-8" )
572
+
573
+ def _broken_algorithm (self , algorithm ):
574
+ # type: (AlgorithmSuite) -> AlgorithmSuite
575
+ if not self ._break_algorithm :
576
+ return algorithm
577
+
578
+ # We want to make sure that we return something different,
579
+ # so find this suite in all suites and grab the next one,
580
+ # whatever that is.
581
+ all_suites = list (AlgorithmSuite )
582
+ suite_index = all_suites .index (algorithm )
583
+ next_index = (suite_index + 1 ) % (len (all_suites ) - 1 )
584
+
585
+ return all_suites [next_index ]
586
+
587
+ def _broken_encryption_context (self , encryption_context ):
588
+ # type: (Dict[str, str]) -> Dict[str, str]
589
+ broken_ec = encryption_context .copy ()
590
+
591
+ if not self ._break_encryption_context :
592
+ return broken_ec
593
+
594
+ # Remove a random value
595
+ try :
596
+ broken_ec .popitem ()
597
+ except KeyError :
598
+ pass
599
+
600
+ # add a random value
601
+ broken_ec [self ._random_string (5 )] = self ._random_string (10 )
602
+
603
+ return broken_ec
604
+
605
+ def _broken_key (self , key ):
606
+ # type: (bytes) -> bytes
607
+ if not self ._break_signing :
608
+ return key
609
+
610
+ return self ._random_string (32 ).encode ("utf-8" )
611
+
612
+ def _break_encryption_materials (self , encryption_materials ):
613
+ # type: (EncryptionMaterials) -> EncryptionMaterials
614
+ return EncryptionMaterials (
615
+ algorithm = self ._broken_algorithm (encryption_materials .algorithm ),
616
+ data_encryption_key = encryption_materials .data_encryption_key ,
617
+ encrypted_data_keys = encryption_materials .encrypted_data_keys ,
618
+ encryption_context = self ._broken_encryption_context (encryption_materials .encryption_context ),
619
+ signing_key = self ._broken_key (encryption_materials .signing_key ),
620
+ keyring_trace = encryption_materials .keyring_trace ,
621
+ )
622
+
623
+ def _break_decryption_materials (self , decryption_materials ):
624
+ # type: (DecryptionMaterials) -> DecryptionMaterials
625
+ return DecryptionMaterials (
626
+ algorithm = self ._broken_algorithm (decryption_materials .algorithm ),
627
+ data_encryption_key = decryption_materials .data_encryption_key ,
628
+ encryption_context = self ._broken_encryption_context (decryption_materials .encryption_context ),
629
+ verification_key = self ._broken_key (decryption_materials .verification_key ),
630
+ keyring_trace = decryption_materials .keyring_trace ,
631
+ )
632
+
633
+ def on_encrypt (self , encryption_materials ):
634
+ # type: (EncryptionMaterials) -> EncryptionMaterials
635
+ return self ._break_encryption_materials (self ._inner_keyring .on_encrypt (encryption_materials ))
636
+
637
+ def on_decrypt (self , decryption_materials , encrypted_data_keys ):
638
+ # type: (DecryptionMaterials, Iterable[EncryptedDataKey]) -> DecryptionMaterials
639
+ return self ._break_decryption_materials (
640
+ self ._inner_keyring .on_decrypt (decryption_materials , encrypted_data_keys )
641
+ )
642
+
643
+
644
+ @attr .s
645
+ class OnlyGenerateKeyring (Keyring ):
646
+ """Keyring that wraps another keyring and removes any encrypted data keys."""
647
+
648
+ _inner_keyring = attr .ib (validator = instance_of (Keyring ))
649
+
650
+ def on_encrypt (self , encryption_materials ):
651
+ # type: (EncryptionMaterials) -> EncryptionMaterials
652
+ materials = self ._inner_keyring .on_encrypt (encryption_materials )
653
+ return EncryptionMaterials (
654
+ algorithm = materials .algorithm ,
655
+ data_encryption_key = materials .data_encryption_key ,
656
+ encryption_context = materials .encryption_context ,
657
+ signing_key = materials .signing_key ,
658
+ keyring_trace = materials .keyring_trace ,
659
+ )
660
+
661
+ def on_decrypt (self , decryption_materials , encrypted_data_keys ):
662
+ # type: (DecryptionMaterials, Iterable[EncryptedDataKey]) -> DecryptionMaterials
663
+ return self ._inner_keyring .on_decrypt (decryption_materials , encrypted_data_keys )
0 commit comments