1
+ from __future__ import annotations
2
+
1
3
import base64
2
- from typing import Any , Callable , Dict , List , Optional , Union
4
+ from typing import Any , Callable , Dict , List
3
5
4
6
import botocore
5
7
from aws_encryption_sdk import (
@@ -26,43 +28,97 @@ def __init__(self, key):
26
28
27
29
class AwsEncryptionSdkProvider (BaseProvider ):
28
30
"""
29
- The AwsEncryptionSdkProvider is to be used as a Provider for the Datamasking class.
30
-
31
- Example
32
- -------
33
- >>> data_masker = DataMasking(provider=AwsEncryptionSdkProvider(keys=[keyARN1, keyARN2,...,]))
34
- >>> encrypted_data = data_masker.encrypt("a string")
35
- "encrptedBase64String"
36
- >>> decrypted_data = data_masker.decrypt(encrypted_data)
37
- "a string"
38
- """
31
+ The AwsEncryptionSdkProvider is used as a provider for the DataMasking class.
32
+
33
+ This provider allows you to perform data masking using the AWS Encryption SDK
34
+ for encryption and decryption. It integrates with the DataMasking class to
35
+ securely encrypt and decrypt sensitive data.
36
+
37
+ Usage Example:
38
+ ```
39
+ from aws_lambda_powertools.utilities.data_masking import DataMasking
40
+ from aws_lambda_powertools.utilities.data_masking.providers.kms.aws_encryption_sdk import (
41
+ AwsEncryptionSdkProvider,
42
+ )
43
+
44
+
45
+ def lambda_handler(event, context):
46
+ provider = AwsEncryptionSdkProvider(["arn:aws:kms:us-east-1:0123456789012:key/key-id"])
47
+ masker = DataMasking(provider=provider)
48
+
49
+ data = {
50
+ "project": "powertools",
51
+ "sensitive": "xxxxxxxxxx"
52
+ }
53
+
54
+ masked = masker.encrypt(data,fields=["sensitive"])
39
55
40
- session = botocore .session .Session ()
41
- register_feature_to_botocore_session (session , "data-masking" )
56
+ return masked
57
+
58
+ ```
59
+ """
42
60
43
61
def __init__ (
44
62
self ,
45
63
keys : List [str ],
46
- client : Optional [ EncryptionSDKClient ] = None ,
64
+ key_provider = None ,
47
65
local_cache_capacity : int = CACHE_CAPACITY ,
48
66
max_cache_age_seconds : float = MAX_CACHE_AGE_SECONDS ,
49
67
max_messages_encrypted : int = MAX_MESSAGES_ENCRYPTED ,
50
- json_serializer : Optional [ Callable [[ Dict ], str ]] = None ,
51
- json_deserializer : Optional [ Callable [[ Union [ Dict , str , bool , int , float ]], str ]] = None ,
68
+ json_serializer : Callable | None = None ,
69
+ json_deserializer : Callable | None = None ,
52
70
):
53
71
super ().__init__ (json_serializer = json_serializer , json_deserializer = json_deserializer )
54
- self .client = client or EncryptionSDKClient ()
72
+
73
+ self ._key_provider = key_provider or KMSKeyProvider (
74
+ keys = keys ,
75
+ local_cache_capacity = local_cache_capacity ,
76
+ max_cache_age_seconds = max_cache_age_seconds ,
77
+ max_messages_encrypted = max_messages_encrypted ,
78
+ json_serializer = self .json_serializer ,
79
+ json_deserializer = self .json_deserializer ,
80
+ )
81
+
82
+ def encrypt (self , data : bytes | str | Dict | int , ** provider_options ) -> str :
83
+ return self ._key_provider .encrypt (data = data , ** provider_options )
84
+
85
+ def decrypt (self , data : str , ** provider_options ) -> Any :
86
+ return self ._key_provider .decrypt (data = data , ** provider_options )
87
+
88
+
89
+ class KMSKeyProvider :
90
+
91
+ """
92
+ The KMSKeyProvider is responsible for assembling an AWS Key Management Service (KMS)
93
+ client, a caching mechanism, and a keyring for secure key management and data encryption.
94
+ """
95
+
96
+ def __init__ (
97
+ self ,
98
+ keys : List [str ],
99
+ json_serializer : Callable ,
100
+ json_deserializer : Callable ,
101
+ local_cache_capacity : int = CACHE_CAPACITY ,
102
+ max_cache_age_seconds : float = MAX_CACHE_AGE_SECONDS ,
103
+ max_messages_encrypted : int = MAX_MESSAGES_ENCRYPTED ,
104
+ ):
105
+ session = botocore .session .Session ()
106
+ register_feature_to_botocore_session (session , "data-masking" )
107
+
108
+ self .json_serializer = json_serializer
109
+ self .json_deserializer = json_deserializer
110
+ self .client = EncryptionSDKClient ()
55
111
self .keys = keys
56
112
self .cache = LocalCryptoMaterialsCache (local_cache_capacity )
57
- self .key_provider = StrictAwsKmsMasterKeyProvider (key_ids = self .keys , botocore_session = self . session )
113
+ self .key_provider = StrictAwsKmsMasterKeyProvider (key_ids = self .keys , botocore_session = session )
58
114
self .cache_cmm = CachingCryptoMaterialsManager (
59
115
master_key_provider = self .key_provider ,
60
116
cache = self .cache ,
61
117
max_age = max_cache_age_seconds ,
62
118
max_messages_encrypted = max_messages_encrypted ,
63
119
)
64
120
65
- def encrypt (self , data : Union [ bytes , str ] , ** provider_options ) -> bytes :
121
+ def encrypt (self , data : bytes | str | Dict | float , ** provider_options ) -> str :
66
122
"""
67
123
Encrypt data using the AwsEncryptionSdkProvider.
68
124
@@ -78,8 +134,12 @@ def encrypt(self, data: Union[bytes, str], **provider_options) -> bytes:
78
134
ciphertext : str
79
135
The encrypted data, as a base64-encoded string.
80
136
"""
81
- data = self .json_serializer (data )
82
- ciphertext , _ = self .client .encrypt (source = data , materials_manager = self .cache_cmm , ** provider_options )
137
+ data_encoded = self .json_serializer (data )
138
+ ciphertext , _ = self .client .encrypt (
139
+ source = data_encoded ,
140
+ materials_manager = self .cache_cmm ,
141
+ ** provider_options ,
142
+ )
83
143
ciphertext = base64 .b64encode (ciphertext ).decode ()
84
144
return ciphertext
85
145
0 commit comments