-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathbasic_file_encryption_with_multiple_providers.py
123 lines (104 loc) · 5.93 KB
/
basic_file_encryption_with_multiple_providers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Example showing creation of a RawMasterKeyProvider, how to use multiple
master key providers to encrypt, and demonstrating that each master key
provider can then be used independently to decrypt the same encrypted message.
"""
import filecmp
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import aws_encryption_sdk
from aws_encryption_sdk.identifiers import EncryptionKeyType, WrappingAlgorithm
from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
class StaticRandomMasterKeyProvider(RawMasterKeyProvider):
"""Randomly generates and provides 4096-bit RSA keys consistently per unique key id."""
provider_id = "static-random"
def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Initialize empty map of keys."""
self._static_keys = {}
def _get_raw_key(self, key_id):
"""Retrieves a static, randomly generated, RSA key for the specified key id.
:param str key_id: User-defined ID for the static key
:returns: Wrapping key that contains the specified static key
:rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
"""
try:
static_key = self._static_keys[key_id]
except KeyError:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend())
static_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
self._static_keys[key_id] = static_key
return WrappingKey(
wrapping_algorithm=WrappingAlgorithm.RSA_OAEP_SHA1_MGF1,
wrapping_key=static_key,
wrapping_key_type=EncryptionKeyType.PRIVATE,
)
def run(aws_kms_cmk, source_plaintext_filename, botocore_session=None):
"""Encrypts and then decrypts a file using a KMS master key provider and a custom static master
key provider. Both master key providers are used to encrypt the plaintext file, so either one alone
can decrypt it.
:param str aws_kms_cmk: Amazon Resource Name (ARN) of the KMS Customer Master Key (CMK)
(http://docs.aws.amazon.com/kms/latest/developerguide/viewing-keys.html)
:param str source_plaintext_filename: Filename of file to encrypt
:param botocore_session: existing botocore session instance
:type botocore_session: botocore.session.Session
"""
# "Cycled" means encrypted and then decrypted
ciphertext_filename = source_plaintext_filename + ".encrypted"
cycled_kms_plaintext_filename = source_plaintext_filename + ".kms.decrypted"
cycled_static_plaintext_filename = source_plaintext_filename + ".static.decrypted"
# Create a KMS master key provider
kms_kwargs = dict(key_ids=[aws_kms_cmk])
if botocore_session is not None:
kms_kwargs["botocore_session"] = botocore_session
kms_master_key_provider = aws_encryption_sdk.KMSMasterKeyProvider(**kms_kwargs)
# Create a static master key provider and add a master key to it
static_key_id = os.urandom(8)
static_master_key_provider = StaticRandomMasterKeyProvider()
static_master_key_provider.add_master_key(static_key_id)
# Add the static master key provider to the KMS master key provider
# The resulting master key provider uses KMS master keys to generate (and encrypt)
# data keys and static master keys to create an additional encrypted copy of each data key.
kms_master_key_provider.add_master_key_provider(static_master_key_provider)
# Encrypt plaintext with both KMS and static master keys
with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext:
with aws_encryption_sdk.stream(source=plaintext, mode="e", key_provider=kms_master_key_provider) as encryptor:
for chunk in encryptor:
ciphertext.write(chunk)
# Decrypt the ciphertext with only the KMS master key
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_kms_plaintext_filename, "wb") as plaintext:
with aws_encryption_sdk.stream(
source=ciphertext, mode="d", key_provider=aws_encryption_sdk.KMSMasterKeyProvider(**kms_kwargs)
) as kms_decryptor:
for chunk in kms_decryptor:
plaintext.write(chunk)
# Decrypt the ciphertext with only the static master key
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_static_plaintext_filename, "wb") as plaintext:
with aws_encryption_sdk.stream(
source=ciphertext, mode="d", key_provider=static_master_key_provider
) as static_decryptor:
for chunk in static_decryptor:
plaintext.write(chunk)
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
assert filecmp.cmp(source_plaintext_filename, cycled_kms_plaintext_filename)
assert filecmp.cmp(source_plaintext_filename, cycled_static_plaintext_filename)
# Verify that the encryption context in the decrypt operation includes all key pairs from the
# encrypt operation.
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in kms_decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items()
)
assert all(
pair in static_decryptor.header.encryption_context.items()
for pair in encryptor.header.encryption_context.items()
)
return (ciphertext_filename, cycled_kms_plaintext_filename, cycled_static_plaintext_filename)