forked from aws/aws-encryption-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasic_file_encryption_with_multiple_providers.py
133 lines (114 loc) · 6.37 KB
/
basic_file_encryption_with_multiple_providers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Example showing creation of a RawMasterKeyProvider, how to use multiple
master key providers to encrypt, and demonstrating that each master key
provider can then be used independently to decrypt the same encrypted message.
"""
import filecmp
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
import aws_encryption_sdk
from aws_encryption_sdk.identifiers import EncryptionKeyType, WrappingAlgorithm
from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
class StaticRandomMasterKeyProvider(RawMasterKeyProvider):
"""Randomly generates and provides 4096-bit RSA keys consistently per unique key id."""
provider_id = "static-random"
def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Initialize empty map of keys."""
self._static_keys = {}
def _get_raw_key(self, key_id):
"""Retrieves a static, randomly generated, RSA key for the specified key id.
:param str key_id: User-defined ID for the static key
:returns: Wrapping key that contains the specified static key
:rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
"""
try:
static_key = self._static_keys[key_id]
except KeyError:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend())
static_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
self._static_keys[key_id] = static_key
return WrappingKey(
wrapping_algorithm=WrappingAlgorithm.RSA_OAEP_SHA1_MGF1,
wrapping_key=static_key,
wrapping_key_type=EncryptionKeyType.PRIVATE,
)
def cycle_file(key_arn, source_plaintext_filename, botocore_session=None):
"""Encrypts and then decrypts a file using a KMS master key provider and a custom static master
key provider. Both master key providers are used to encrypt the plaintext file, so either one alone
can decrypt it.
:param str key_arn: Amazon Resource Name (ARN) of the KMS Customer Master Key (CMK)
(http://docs.aws.amazon.com/kms/latest/developerguide/viewing-keys.html)
:param str source_plaintext_filename: Filename of file to encrypt
:param botocore_session: existing botocore session instance
:type botocore_session: botocore.session.Session
"""
# "Cycled" means encrypted and then decrypted
ciphertext_filename = source_plaintext_filename + ".encrypted"
cycled_kms_plaintext_filename = source_plaintext_filename + ".kms.decrypted"
cycled_static_plaintext_filename = source_plaintext_filename + ".static.decrypted"
# Create a KMS master key provider
kms_kwargs = dict(key_ids=[key_arn])
if botocore_session is not None:
kms_kwargs["botocore_session"] = botocore_session
kms_master_key_provider = aws_encryption_sdk.KMSMasterKeyProvider(**kms_kwargs)
# Create a static master key provider and add a master key to it
static_key_id = os.urandom(8)
static_master_key_provider = StaticRandomMasterKeyProvider()
static_master_key_provider.add_master_key(static_key_id)
# Add the static master key provider to the KMS master key provider
# The resulting master key provider uses KMS master keys to generate (and encrypt)
# data keys and static master keys to create an additional encrypted copy of each data key.
kms_master_key_provider.add_master_key_provider(static_master_key_provider)
# Encrypt plaintext with both KMS and static master keys
with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext:
with aws_encryption_sdk.stream(source=plaintext, mode="e", key_provider=kms_master_key_provider) as encryptor:
for chunk in encryptor:
ciphertext.write(chunk)
# Decrypt the ciphertext with only the KMS master key
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_kms_plaintext_filename, "wb") as plaintext:
with aws_encryption_sdk.stream(
source=ciphertext, mode="d", key_provider=aws_encryption_sdk.KMSMasterKeyProvider(**kms_kwargs)
) as kms_decryptor:
for chunk in kms_decryptor:
plaintext.write(chunk)
# Decrypt the ciphertext with only the static master key
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_static_plaintext_filename, "wb") as plaintext:
with aws_encryption_sdk.stream(
source=ciphertext, mode="d", key_provider=static_master_key_provider
) as static_decryptor:
for chunk in static_decryptor:
plaintext.write(chunk)
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext
assert filecmp.cmp(source_plaintext_filename, cycled_kms_plaintext_filename)
assert filecmp.cmp(source_plaintext_filename, cycled_static_plaintext_filename)
# Verify that the encryption context in the decrypt operation includes all key pairs from the
# encrypt operation.
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in kms_decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items()
)
assert all(
pair in static_decryptor.header.encryption_context.items()
for pair in encryptor.header.encryption_context.items()
)
return (ciphertext_filename, cycled_kms_plaintext_filename, cycled_static_plaintext_filename)