forked from aws/aws-encryption-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdocs_examples_bytes.py
96 lines (77 loc) · 3.62 KB
/
docs_examples_bytes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except
in compliance with the License. A copy of the License is located at
https://aws.amazon.com/apache-2-0/
or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import filecmp
import os
import aws_encryption_sdk
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
class StaticRandomMasterKeyProvider(RawMasterKeyProvider):
"""Randomly generates 256-bit keys for each unique key ID."""
provider_id = 'static-random'
def __init__(self, **kwargs):
self._static_keys = {}
def _get_raw_key(self, key_id):
"""Returns a static, randomly-generated symmetric key for the specified key ID.
:param str key_id: Key ID
:returns: Wrapping key that contains the specified static key
:rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
"""
try:
static_key = self._static_keys[key_id]
except KeyError:
static_key = os.urandom(32)
self._static_keys[key_id] = static_key
return WrappingKey(
wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=static_key,
wrapping_key_type=EncryptionKeyType.SYMMETRIC
)
def cycle_file(source_plaintext_filename):
"""Encrypts and then decrypts a file under a custom static master key provider.
:param str source_plaintext_filename: Filename of file to encrypt
"""
# Create a static random master key provider
key_id = os.urandom(8)
master_key_provider = StaticRandomMasterKeyProvider()
master_key_provider.add_master_key(key_id)
ciphertext_filename = source_plaintext_filename + '.encrypted'
cycled_plaintext_filename = source_plaintext_filename + '.decrypted'
# Encrypt the plaintext source data
with open(source_plaintext_filename, 'rb') as plaintext, open(ciphertext_filename, 'wb') as ciphertext:
with aws_encryption_sdk.stream(
mode='e',
source=plaintext,
key_provider=master_key_provider
) as encryptor:
for chunk in encryptor:
ciphertext.write(chunk)
# Decrypt the ciphertext
with open(ciphertext_filename, 'rb') as ciphertext, open(cycled_plaintext_filename, 'wb') as plaintext:
with aws_encryption_sdk.stream(
mode='d',
source=ciphertext,
key_provider=master_key_provider
) as decryptor:
for chunk in decryptor:
plaintext.write(chunk)
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source
# plaintext
assert filecmp.cmp(source_plaintext_filename, cycled_plaintext_filename)
# Verify that the encryption context used in the decrypt operation includes all key pairs from
# the encrypt operation
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in decryptor.header.encryption_context.items()
for pair in encryptor.header.encryption_context.items()
)
return ciphertext_filename, cycled_plaintext_filename