-
Notifications
You must be signed in to change notification settings - Fork 86
/
Copy pathencryption_context.py
160 lines (133 loc) · 6.33 KB
/
encryption_context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Components for handling serialization and deserialization of
encryption context data in AWS Encryption SDK messages.
"""
import codecs
import logging
import struct
import aws_encryption_sdk.identifiers
import aws_encryption_sdk.internal.defaults
import aws_encryption_sdk.internal.str_ops
from aws_encryption_sdk.exceptions import SerializationError
_LOGGER = logging.getLogger(__name__)
def assemble_content_aad(message_id, aad_content_string, seq_num, length):
"""Assembles the Body AAD string for a message body structure.
:param message_id: Message ID
:type message_id: str
:param aad_content_string: ContentAADString object for frame type
:type aad_content_string: aws_encryption_sdk.identifiers.ContentAADString
:param seq_num: Sequence number of frame
:type seq_num: int
:param length: Content Length
:type length: int
:returns: Properly formatted AAD bytes for message body structure.
:rtype: bytes
:raises SerializationError: if aad_content_string is not known
"""
if not isinstance(aad_content_string, aws_encryption_sdk.identifiers.ContentAADString):
raise SerializationError("Unknown aad_content_string")
fmt = ">{}s{}sIQ".format(len(message_id), len(aad_content_string.value))
return struct.pack(fmt, message_id, aad_content_string.value, seq_num, length)
def serialize_encryption_context(encryption_context):
"""Serializes the contents of a dictionary into a byte string.
:param dict encryption_context: Dictionary of encrytion context keys/values.
:returns: Serialized encryption context
:rtype: bytes
"""
if not encryption_context:
return bytes()
serialized_context = bytearray()
dict_size = len(encryption_context)
if dict_size > aws_encryption_sdk.internal.defaults.MAX_BYTE_ARRAY_SIZE:
raise SerializationError("The encryption context contains too many elements.")
serialized_context.extend(struct.pack(">H", dict_size))
# Encode strings first to catch bad values.
encryption_context_list = []
for key, value in encryption_context.items():
try:
if isinstance(key, bytes):
key = codecs.decode(key)
if isinstance(value, bytes):
value = codecs.decode(value)
encryption_context_list.append(
(aws_encryption_sdk.internal.str_ops.to_bytes(key), aws_encryption_sdk.internal.str_ops.to_bytes(value))
)
except Exception:
raise SerializationError(
"Cannot encode dictionary key or value using {}.".format(aws_encryption_sdk.internal.defaults.ENCODING)
)
for key, value in sorted(encryption_context_list, key=lambda x: x[0]):
serialized_context.extend(
struct.pack(
">H{key_size}sH{value_size}s".format(key_size=len(key), value_size=len(value)),
len(key),
key,
len(value),
value,
)
)
if len(serialized_context) > aws_encryption_sdk.internal.defaults.MAX_BYTE_ARRAY_SIZE:
raise SerializationError("The serialized context is too large.")
return bytes(serialized_context)
def read_short(source, offset):
"""Reads a number from a byte array.
:param bytes source: Source byte string
:param int offset: Point in byte string to start reading
:returns: Read number and offset at point after read data
:rtype: tuple of ints
:raises: SerializationError if unable to unpack
"""
try:
(short,) = struct.unpack_from(">H", source, offset)
return short, offset + struct.calcsize(">H")
except struct.error:
raise SerializationError("Bad format of serialized context.")
def read_string(source, offset, length):
"""Reads a string from a byte string.
:param bytes source: Source byte string
:param int offset: Point in byte string to start reading
:param int length: Length of string to read
:returns: Read string and offset at point after read data
:rtype: tuple of str and int
:raises SerializationError: if unable to unpack
"""
end = offset + length
try:
return (codecs.decode(source[offset:end], aws_encryption_sdk.internal.defaults.ENCODING), end)
except Exception:
raise SerializationError("Bad format of serialized context.")
def deserialize_encryption_context(serialized_encryption_context):
"""Deserializes the contents of a byte string into a dictionary.
:param bytes serialized_encryption_context: Source byte string containing serialized dictionary
:returns: Deserialized encryption context
:rtype: dict
:raises SerializationError: if serialized encryption context is too large
:raises SerializationError: if duplicate key found in serialized encryption context
:raises SerializationError: if malformed data found in serialized encryption context
"""
if len(serialized_encryption_context) > aws_encryption_sdk.internal.defaults.MAX_BYTE_ARRAY_SIZE:
raise SerializationError("Serialized context is too long.")
if serialized_encryption_context == b"":
_LOGGER.debug("No encryption context data found")
return {}
deserialized_size = 0
encryption_context = {}
dict_size, deserialized_size = read_short(source=serialized_encryption_context, offset=deserialized_size)
_LOGGER.debug("Found %d keys", dict_size)
for _ in range(dict_size):
key_size, deserialized_size = read_short(source=serialized_encryption_context, offset=deserialized_size)
key, deserialized_size = read_string(
source=serialized_encryption_context, offset=deserialized_size, length=key_size
)
value_size, deserialized_size = read_short(source=serialized_encryption_context, offset=deserialized_size)
value, deserialized_size = read_string(
source=serialized_encryption_context, offset=deserialized_size, length=value_size
)
if key in encryption_context:
raise SerializationError("Duplicate key in serialized context.")
encryption_context[key] = value
if deserialized_size != len(serialized_encryption_context):
raise SerializationError("Formatting error: Extra data in serialized context.")
return encryption_context