-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathbasic_file_encryption_with_multiple_providers.py
125 lines (105 loc) · 6.36 KB
/
basic_file_encryption_with_multiple_providers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Example showing creation of a RawMasterKeyProvider, how to use multiple
master key providers to encrypt, and demonstrating that each master key
provider can then be used independently to decrypt the same encrypted message.
"""
import filecmp
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import aws_encryption_sdk
from aws_encryption_sdk.identifiers import CommitmentPolicy, EncryptionKeyType, WrappingAlgorithm
from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
class StaticRandomMasterKeyProvider(RawMasterKeyProvider):
"""Randomly generates and provides 4096-bit RSA keys consistently per unique key id."""
provider_id = "static-random"
def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Initialize empty map of keys."""
self._static_keys = {}
def _get_raw_key(self, key_id):
"""Retrieves a static, randomly generated, RSA key for the specified key id.
:param str key_id: User-defined ID for the static key
:returns: Wrapping key that contains the specified static key
:rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
"""
try:
static_key = self._static_keys[key_id]
except KeyError:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend())
static_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
self._static_keys[key_id] = static_key
return WrappingKey(
wrapping_algorithm=WrappingAlgorithm.RSA_OAEP_SHA1_MGF1,
wrapping_key=static_key,
wrapping_key_type=EncryptionKeyType.PRIVATE,
)
def cycle_file(key_arn, source_plaintext_filename, botocore_session=None):
"""Encrypts and then decrypts a file using a KMS master key provider and a custom static master
key provider. Both master key providers are used to encrypt the plaintext file, so either one alone
can decrypt it.
:param str key_arn: Amazon Resource Name (ARN) of the KMS Customer Master Key (CMK)
(http://docs.aws.amazon.com/kms/latest/developerguide/viewing-keys.html)
:param str source_plaintext_filename: Filename of file to encrypt
:param botocore_session: existing botocore session instance
:type botocore_session: botocore.session.Session
"""
# "Cycled" means encrypted and then decrypted
ciphertext_filename = source_plaintext_filename + ".encrypted"
cycled_kms_plaintext_filename = source_plaintext_filename + ".kms.decrypted"
cycled_static_plaintext_filename = source_plaintext_filename + ".static.decrypted"
# Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a
# commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default.
client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT)
# Create a KMS master key provider
kms_kwargs = dict(key_ids=[key_arn])
if botocore_session is not None:
kms_kwargs["botocore_session"] = botocore_session
kms_master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
# Create a static master key provider and add a master key to it
static_key_id = os.urandom(8)
static_master_key_provider = StaticRandomMasterKeyProvider()
static_master_key_provider.add_master_key(static_key_id)
# Add the static master key provider to the AWS KMS master key provider
# The resulting master key provider uses AWS KMS master keys to generate (and encrypt)
# data keys and static master keys to create an additional encrypted copy of each data key.
kms_master_key_provider.add_master_key_provider(static_master_key_provider)
# Encrypt plaintext with both AWS KMS and static master keys
with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext:
with client.stream(source=plaintext, mode="e", key_provider=kms_master_key_provider) as encryptor:
for chunk in encryptor:
ciphertext.write(chunk)
# Decrypt the ciphertext with only the AWS KMS master key
# Buffer the data in memory before writing to disk to ensure the signature is verified first.
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_kms_plaintext_filename, "wb") as plaintext:
with client.stream(
source=ciphertext, mode="d", key_provider=aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs)
) as kms_decryptor:
plaintext.write(kms_decryptor.read())
# Decrypt the ciphertext with only the static master key
# Buffer the data in memory before writing to disk to ensure the signature is verified first.
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_static_plaintext_filename, "wb") as plaintext:
with client.stream(source=ciphertext, mode="d", key_provider=static_master_key_provider) as static_decryptor:
plaintext.write(static_decryptor.read())
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
assert filecmp.cmp(source_plaintext_filename, cycled_kms_plaintext_filename)
assert filecmp.cmp(source_plaintext_filename, cycled_static_plaintext_filename)
# Verify that the encryption context in the decrypt operation includes all key pairs from the
# encrypt operation.
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in kms_decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items()
)
assert all(
pair in static_decryptor.header.encryption_context.items()
for pair in encryptor.header.encryption_context.items()
)
return (ciphertext_filename, cycled_kms_plaintext_filename, cycled_static_plaintext_filename)