-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathbasic_file_encryption_with_raw_key_provider.py
78 lines (63 loc) · 3.23 KB
/
basic_file_encryption_with_raw_key_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""Example showing creation and use of a RawMasterKeyProvider."""
import filecmp
import os
import aws_encryption_sdk
from aws_encryption_sdk.identifiers import EncryptionKeyType, WrappingAlgorithm
from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
class StaticRandomMasterKeyProvider(RawMasterKeyProvider):
"""Randomly generates 256-bit keys for each unique key ID."""
provider_id = "static-random"
def __init__(self, **kwargs): # pylint: disable=unused-argument
"""Initialize empty map of keys."""
self._static_keys = {}
def _get_raw_key(self, key_id):
"""Returns a static, randomly-generated symmetric key for the specified key ID.
:param str key_id: Key ID
:returns: Wrapping key that contains the specified static key
:rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey`
"""
try:
static_key = self._static_keys[key_id]
except KeyError:
static_key = os.urandom(32)
self._static_keys[key_id] = static_key
return WrappingKey(
wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
wrapping_key=static_key,
wrapping_key_type=EncryptionKeyType.SYMMETRIC,
)
def run(source_plaintext_filename):
"""Encrypts and then decrypts a file under a custom static master key provider.
:param str source_plaintext_filename: Filename of file to encrypt
"""
# Create a static random master key provider
key_id = os.urandom(8)
master_key_provider = StaticRandomMasterKeyProvider()
master_key_provider.add_master_key(key_id)
ciphertext_filename = source_plaintext_filename + ".encrypted"
cycled_plaintext_filename = source_plaintext_filename + ".decrypted"
# Encrypt the plaintext source data
with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext:
with aws_encryption_sdk.stream(mode="e", source=plaintext, key_provider=master_key_provider) as encryptor:
for chunk in encryptor:
ciphertext.write(chunk)
# Decrypt the ciphertext
with open(ciphertext_filename, "rb") as ciphertext, open(cycled_plaintext_filename, "wb") as plaintext:
with aws_encryption_sdk.stream(mode="d", source=ciphertext, key_provider=master_key_provider) as decryptor:
for chunk in decryptor:
plaintext.write(chunk)
# Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source
# plaintext
assert filecmp.cmp(source_plaintext_filename, cycled_plaintext_filename)
# Verify that the encryption context used in the decrypt operation includes all key pairs from
# the encrypt operation
#
# In production, always use a meaningful encryption context. In this sample, we omit the
# encryption context (no key pairs).
assert all(
pair in decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items()
)
return ciphertext_filename, cycled_plaintext_filename