forked from aws/aws-encryption-sdk-javascript
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathframed_encrypt_stream.ts
211 lines (182 loc) · 7.61 KB
/
framed_encrypt_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
serializeFactory, aadFactory,
MessageHeader, // eslint-disable-line no-unused-vars
Maximum
} from '@aws-crypto/serialize'
// @ts-ignore
import { Transform as PortableTransform } from 'readable-stream'
import { Transform } from 'stream' // eslint-disable-line no-unused-vars
import {
GetCipher, // eslint-disable-line no-unused-vars
AwsEsdkJsCipherGCM, // eslint-disable-line no-unused-vars
needs
} from '@aws-crypto/material-management-node'
const fromUtf8 = (input: string) => Buffer.from(input, 'utf8')
const serialize = serializeFactory(fromUtf8)
const { finalFrameHeader, frameHeader } = serialize
const aadUtility = aadFactory(fromUtf8)
interface AccumulatingFrame {
contentLength: number
content: Buffer[]
sequenceNumber: number
}
interface EncryptFrame {
content: Buffer[]
bodyHeader: Buffer
headerSent?: boolean
cipher: AwsEsdkJsCipherGCM,
isFinalFrame: boolean
}
const ioTick = () => new Promise(resolve => setImmediate(resolve))
const noop = () => {}
type ErrBack = (err?: Error) => void
export function getFramedEncryptStream (getCipher: GetCipher, messageHeader: MessageHeader, dispose: Function, plaintextLength?: number) {
let accumulatingFrame: AccumulatingFrame = { contentLength: 0, content: [], sequenceNumber: 1 }
let pathologicalDrain: Function = noop
const { frameLength } = messageHeader
/* Precondition: plaintextLength must be within bounds.
* The Maximum.BYTES_PER_MESSAGE is set to be within Number.MAX_SAFE_INTEGER
* See serialize/identifiers.ts enum Maximum for more details.
*/
needs(!plaintextLength || (plaintextLength >= 0 && Maximum.BYTES_PER_MESSAGE >= plaintextLength), 'plaintextLength out of bounds.')
/* Keeping the messageHeader, accumulatingFrame and pathologicalDrain private is the intention here.
* It is already unlikely that these values could be touched in the current composition of streams,
* but a different composition may change this.
* Since we are handling the plain text here, it seems prudent to take extra measures.
*/
return new (class FramedEncryptStream extends (<new (...args: any[]) => Transform>PortableTransform) {
_transform (chunk: Buffer, encoding: string, callback: ErrBack) {
const contentLeft = frameLength - accumulatingFrame.contentLength
/* Precondition: Must not process more than plaintextLength.
* The plaintextLength is the MAXIMUM value that can be encrypted.
*/
needs(!plaintextLength || (plaintextLength -= chunk.length) >= 0, 'Encrypted data exceeded plaintextLength.')
/* Check for early return (Postcondition): Have not accumulated a frame. */
if (contentLeft > chunk.length) {
// eat more
accumulatingFrame.contentLength += chunk.length
accumulatingFrame.content.push(chunk)
return callback()
}
accumulatingFrame.contentLength += contentLeft
accumulatingFrame.content.push(chunk.slice(0, contentLeft))
// grab the tail
const tail = chunk.slice(contentLeft)
const encryptFrame = getEncryptFrame({
pendingFrame: accumulatingFrame,
messageHeader,
getCipher,
isFinalFrame: false
})
// Reset frame state for next frame
const { sequenceNumber } = accumulatingFrame
accumulatingFrame = {
contentLength: 0,
content: [],
sequenceNumber: sequenceNumber + 1
}
this._flushEncryptFrame(encryptFrame)
.then(() => this._transform(tail, encoding, callback))
.catch(callback)
}
_flush (callback: ErrBack) {
const encryptFrame = getEncryptFrame({
pendingFrame: accumulatingFrame,
messageHeader,
getCipher,
isFinalFrame: true
})
this._flushEncryptFrame(encryptFrame)
.then(() => callback())
.catch(callback)
}
_destroy () {
dispose()
}
_read (size: number) {
super._read(size)
/* The _flushEncryptFrame encrypts and pushes the frame.
* If this.push returns false then this stream
* should wait until the destination stream calls read.
* This means that _flushEncryptFrame needs to wait for some
* indeterminate time. I create a closure around
* the resolution function for a promise that
* is created in _flushEncryptFrame. This way
* here in _read (the implementation of read)
* if a frame is being pushed, we can release
* it.
*/
pathologicalDrain()
pathologicalDrain = noop
}
async _flushEncryptFrame (encryptingFrame: EncryptFrame) {
const { content, cipher, bodyHeader, isFinalFrame } = encryptingFrame
this.push(bodyHeader)
let frameSize = 0
const cipherContent: Buffer[] = []
for (const clearChunk of content) {
const cipherText = cipher.update(clearChunk)
frameSize += cipherText.length
cipherContent.push(cipherText)
await ioTick()
}
/* Finalize the cipher and handle any tail. */
const tail = cipher.final()
frameSize += tail.length
cipherContent.push(tail)
/* Push the authTag onto the end. Yes, I am abusing the name. */
cipherContent.push(cipher.getAuthTag())
needs(frameSize === frameLength || (isFinalFrame && frameLength >= frameSize), 'Malformed frame')
for (const cipherText of cipherContent) {
if (!this.push(cipherText)) {
/* back pressure: if push returns false, wait until _read
* has been called.
*/
await new Promise(resolve => { pathologicalDrain = resolve })
}
}
if (isFinalFrame) this.push(null)
}
})()
}
type EncryptFrameInput = {
pendingFrame: AccumulatingFrame,
messageHeader: MessageHeader,
getCipher: GetCipher,
isFinalFrame: boolean
}
export function getEncryptFrame (input: EncryptFrameInput): EncryptFrame {
const { pendingFrame, messageHeader, getCipher, isFinalFrame } = input
const { sequenceNumber, contentLength, content } = pendingFrame
const { frameLength, contentType, messageId, headerIvLength } = messageHeader
/* Precondition: The content length MUST correlate with the frameLength.
* In the case of a regular frame,
* the content length MUST strictly equal the frame length.
* In the case of the final frame,
* it MUST NOT be larger than the frame length.
*/
needs(frameLength === contentLength || (isFinalFrame && frameLength >= contentLength), `Malformed frame length and content length: ${JSON.stringify({ frameLength, contentLength, isFinalFrame })}`)
const frameIv = serialize.frameIv(headerIvLength, sequenceNumber)
const bodyHeader = Buffer.from(isFinalFrame
? finalFrameHeader(sequenceNumber, frameIv, contentLength)
: frameHeader(sequenceNumber, frameIv))
const contentString = aadUtility.messageAADContentString({ contentType, isFinalFrame })
const { buffer, byteOffset, byteLength } = aadUtility.messageAAD(messageId, contentString, sequenceNumber, contentLength)
const cipher = getCipher(frameIv)
cipher.setAAD(Buffer.from(buffer, byteOffset, byteLength))
return { content, cipher, bodyHeader, isFinalFrame }
}