Skip to content

Commit f4e0bd9

Browse files
authored
feat(util-stream): create checksum stream adapters (#1409)
* feat(util-stream): create checksum stream adapters * add bundler metadata * move TransformStream checksum to flush event * improve uniformity of node/web checksumstream api * alphabetization * use class inheritance * inheritance issue in jest * add karma test for checksum stream * separate files
1 parent 536fb7f commit f4e0bd9

10 files changed

+505
-1
lines changed

.changeset/red-cameras-repair.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/util-stream": minor
3+
---
4+
5+
create checksum stream adapter

packages/util-stream/karma.conf.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@
33
module.exports = function (config) {
44
config.set({
55
frameworks: ["jasmine", "karma-typescript"],
6-
files: ["src/getAwsChunkedEncodingStream.browser.ts", "src/getAwsChunkedEncodingStream.browser.spec.ts"],
6+
files: [
7+
"src/checksum/createChecksumStream.browser.spec.ts",
8+
"src/checksum/createChecksumStream.browser.ts",
9+
"src/checksum/ChecksumStream.browser.ts",
10+
"src/getAwsChunkedEncodingStream.browser.spec.ts",
11+
"src/getAwsChunkedEncodingStream.browser.ts",
12+
"src/headStream.browser.ts",
13+
"src/stream-type-check.ts",
14+
],
715
exclude: ["**/*.d.ts"],
816
preprocessors: {
917
"**/*.ts": "karma-typescript",

packages/util-stream/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,19 @@
5555
"dist-*/**"
5656
],
5757
"browser": {
58+
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
5859
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
5960
"./dist-es/headStream": "./dist-es/headStream.browser",
6061
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
6162
"./dist-es/splitStream": "./dist-es/splitStream.browser"
6263
},
6364
"react-native": {
65+
"./dist-es/checksum/createChecksumStream": "./dist-es/checksum/createChecksumStream.browser",
6466
"./dist-es/getAwsChunkedEncodingStream": "./dist-es/getAwsChunkedEncodingStream.browser",
6567
"./dist-es/sdk-stream-mixin": "./dist-es/sdk-stream-mixin.browser",
6668
"./dist-es/headStream": "./dist-es/headStream.browser",
6769
"./dist-es/splitStream": "./dist-es/splitStream.browser",
70+
"./dist-cjs/checksum/createChecksumStream": "./dist-cjs/checksum/createChecksumStream.browser",
6871
"./dist-cjs/getAwsChunkedEncodingStream": "./dist-cjs/getAwsChunkedEncodingStream.browser",
6972
"./dist-cjs/sdk-stream-mixin": "./dist-cjs/sdk-stream-mixin.browser",
7073
"./dist-cjs/headStream": "./dist-cjs/headStream.browser",
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { Checksum, Encoder } from "@smithy/types";
2+
3+
/**
4+
* @internal
5+
*/
6+
export interface ChecksumStreamInit {
7+
/**
8+
* Base64 value of the expected checksum.
9+
*/
10+
expectedChecksum: string;
11+
/**
12+
* For error messaging, the location from which the checksum value was read.
13+
*/
14+
checksumSourceLocation: string;
15+
/**
16+
* The checksum calculator.
17+
*/
18+
checksum: Checksum;
19+
/**
20+
* The stream to be checked.
21+
*/
22+
source: ReadableStream;
23+
24+
/**
25+
* Optional base 64 encoder if calling from a request context.
26+
*/
27+
base64Encoder?: Encoder;
28+
}
29+
30+
const ReadableStreamRef = typeof ReadableStream === "function" ? ReadableStream : function (): void {};
31+
32+
/**
33+
* This stub exists so that the readable returned by createChecksumStream
34+
* identifies as "ChecksumStream" in alignment with the Node.js
35+
* implementation.
36+
*
37+
* @extends ReadableStream
38+
*/
39+
export class ChecksumStream extends (ReadableStreamRef as any) {}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import { Checksum, Encoder } from "@smithy/types";
2+
import { toBase64 } from "@smithy/util-base64";
3+
import { Duplex, Readable } from "stream";
4+
5+
/**
6+
* @internal
7+
*/
8+
export interface ChecksumStreamInit<T extends Readable | ReadableStream> {
9+
/**
10+
* Base64 value of the expected checksum.
11+
*/
12+
expectedChecksum: string;
13+
/**
14+
* For error messaging, the location from which the checksum value was read.
15+
*/
16+
checksumSourceLocation: string;
17+
/**
18+
* The checksum calculator.
19+
*/
20+
checksum: Checksum;
21+
/**
22+
* The stream to be checked.
23+
*/
24+
source: T;
25+
26+
/**
27+
* Optional base 64 encoder if calling from a request context.
28+
*/
29+
base64Encoder?: Encoder;
30+
}
31+
32+
/**
33+
* @internal
34+
*
35+
* Wrapper for throwing checksum errors for streams without
36+
* buffering the stream.
37+
*
38+
*/
39+
export class ChecksumStream extends Duplex {
40+
private expectedChecksum: string;
41+
private checksumSourceLocation: string;
42+
private checksum: Checksum;
43+
private source?: Readable;
44+
private base64Encoder: Encoder;
45+
46+
public constructor({
47+
expectedChecksum,
48+
checksum,
49+
source,
50+
checksumSourceLocation,
51+
base64Encoder,
52+
}: ChecksumStreamInit<Readable>) {
53+
super();
54+
if (typeof (source as Readable).pipe === "function") {
55+
this.source = source as Readable;
56+
} else {
57+
throw new Error(
58+
`@smithy/util-stream: unsupported source type ${source?.constructor?.name ?? source} in ChecksumStream.`
59+
);
60+
}
61+
62+
this.base64Encoder = base64Encoder ?? toBase64;
63+
this.expectedChecksum = expectedChecksum;
64+
this.checksum = checksum;
65+
this.checksumSourceLocation = checksumSourceLocation;
66+
67+
// connect this stream to the end of the source stream.
68+
this.source.pipe(this);
69+
}
70+
71+
/**
72+
* @internal do not call this directly.
73+
*/
74+
public _read(
75+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
76+
size: number
77+
): void {}
78+
79+
/**
80+
* @internal do not call this directly.
81+
*
82+
* When the upstream source flows data to this stream,
83+
* calculate a step update of the checksum.
84+
*/
85+
public _write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void {
86+
try {
87+
this.checksum.update(chunk);
88+
this.push(chunk);
89+
} catch (e: unknown) {
90+
return callback(e as Error);
91+
}
92+
return callback();
93+
}
94+
95+
/**
96+
* @internal do not call this directly.
97+
*
98+
* When the upstream source finishes, perform the checksum comparison.
99+
*/
100+
public async _final(callback: (err?: Error) => void): Promise<void> {
101+
try {
102+
const digest: Uint8Array = await this.checksum.digest();
103+
const received = this.base64Encoder(digest);
104+
if (this.expectedChecksum !== received) {
105+
return callback(
106+
new Error(
107+
`Checksum mismatch: expected "${this.expectedChecksum}" but received "${received}"` +
108+
` in response header "${this.checksumSourceLocation}".`
109+
)
110+
);
111+
}
112+
} catch (e: unknown) {
113+
return callback(e as Error);
114+
}
115+
this.push(null);
116+
return callback();
117+
}
118+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { Checksum } from "@smithy/types";
2+
import { toBase64 } from "@smithy/util-base64";
3+
import { toUtf8 } from "@smithy/util-utf8";
4+
5+
import { headStream } from "../headStream.browser";
6+
import { ChecksumStream as ChecksumStreamWeb } from "./ChecksumStream.browser";
7+
import { createChecksumStream } from "./createChecksumStream.browser";
8+
9+
describe("Checksum streams", () => {
10+
/**
11+
* Hash "algorithm" that appends all data together.
12+
*/
13+
class Appender implements Checksum {
14+
public hash = "";
15+
async digest(): Promise<Uint8Array> {
16+
return Buffer.from(this.hash);
17+
}
18+
reset(): void {
19+
throw new Error("Function not implemented.");
20+
}
21+
update(chunk: Uint8Array): void {
22+
this.hash += toUtf8(chunk);
23+
}
24+
}
25+
26+
const canonicalData = new Uint8Array("abcdefghijklmnopqrstuvwxyz".split("").map((_) => _.charCodeAt(0)));
27+
28+
const canonicalUtf8 = toUtf8(canonicalData);
29+
const canonicalBase64 = toBase64(canonicalUtf8);
30+
31+
describe(createChecksumStream.name + " webstreams API", () => {
32+
if (typeof ReadableStream !== "function") {
33+
// test not applicable to Node.js 16.
34+
return;
35+
}
36+
37+
const makeStream = () => {
38+
return new ReadableStream({
39+
start(controller) {
40+
canonicalData.forEach((byte) => {
41+
controller.enqueue(new Uint8Array([byte]));
42+
});
43+
controller.close();
44+
},
45+
});
46+
};
47+
48+
it("should extend a ReadableStream", async () => {
49+
const stream = makeStream();
50+
const checksumStream = createChecksumStream({
51+
expectedChecksum: canonicalBase64,
52+
checksum: new Appender(),
53+
checksumSourceLocation: "my-header",
54+
source: stream,
55+
});
56+
57+
expect(checksumStream).toBeInstanceOf(ReadableStream);
58+
expect(checksumStream).toBeInstanceOf(ChecksumStreamWeb);
59+
60+
const collected = toUtf8(await headStream(checksumStream, Infinity));
61+
expect(collected).toEqual(canonicalUtf8);
62+
expect(stream.locked).toEqual(true);
63+
64+
// expectation is that it is resolved.
65+
expect(await checksumStream.getReader().closed);
66+
});
67+
68+
it("should throw during stream read if the checksum does not match", async () => {
69+
const stream = makeStream();
70+
const checksumStream = createChecksumStream({
71+
expectedChecksum: "different-expected-checksum",
72+
checksum: new Appender(),
73+
checksumSourceLocation: "my-header",
74+
source: stream,
75+
});
76+
77+
try {
78+
toUtf8(await headStream(checksumStream, Infinity));
79+
throw new Error("stream was read successfully");
80+
} catch (e: unknown) {
81+
expect(String(e)).toEqual(
82+
`Error: Checksum mismatch: expected "different-expected-checksum" but` +
83+
` received "${canonicalBase64}"` +
84+
` in response header "my-header".`
85+
);
86+
}
87+
});
88+
});
89+
});
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { toBase64 } from "@smithy/util-base64";
2+
3+
import { isReadableStream } from "../stream-type-check";
4+
import { ChecksumStream, ChecksumStreamInit } from "./ChecksumStream.browser";
5+
6+
/**
7+
* @internal
8+
* Alias prevents compiler from turning
9+
* ReadableStream into ReadableStream<any>, which is incompatible
10+
* with the NodeJS.ReadableStream global type.
11+
*/
12+
export type ReadableStreamType = ReadableStream;
13+
14+
/**
15+
* This is a local copy of
16+
* https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController
17+
* in case users do not have this type.
18+
*/
19+
interface TransformStreamDefaultController {
20+
enqueue(chunk: any): void;
21+
error(error: unknown): void;
22+
terminate(): void;
23+
}
24+
25+
/**
26+
* @internal
27+
*
28+
* Creates a stream adapter for throwing checksum errors for streams without
29+
* buffering the stream.
30+
*/
31+
export const createChecksumStream = ({
32+
expectedChecksum,
33+
checksum,
34+
source,
35+
checksumSourceLocation,
36+
base64Encoder,
37+
}: ChecksumStreamInit): ReadableStreamType => {
38+
if (!isReadableStream(source)) {
39+
throw new Error(
40+
`@smithy/util-stream: unsupported source type ${(source as any)?.constructor?.name ?? source} in ChecksumStream.`
41+
);
42+
}
43+
44+
const encoder = base64Encoder ?? toBase64;
45+
46+
if (typeof TransformStream !== "function") {
47+
throw new Error(
48+
"@smithy/util-stream: unable to instantiate ChecksumStream because API unavailable: ReadableStream/TransformStream."
49+
);
50+
}
51+
52+
const transform = new TransformStream({
53+
start() {},
54+
async transform(chunk: any, controller: TransformStreamDefaultController) {
55+
/**
56+
* When the upstream source flows data to this stream,
57+
* calculate a step update of the checksum.
58+
*/
59+
checksum.update(chunk);
60+
controller.enqueue(chunk);
61+
},
62+
async flush(controller: TransformStreamDefaultController) {
63+
const digest: Uint8Array = await checksum.digest();
64+
const received = encoder(digest);
65+
66+
if (expectedChecksum !== received) {
67+
const error = new Error(
68+
`Checksum mismatch: expected "${expectedChecksum}" but received "${received}"` +
69+
` in response header "${checksumSourceLocation}".`
70+
);
71+
controller.error(error);
72+
} else {
73+
controller.terminate();
74+
}
75+
},
76+
});
77+
78+
source.pipeThrough(transform);
79+
const readable = transform.readable;
80+
Object.setPrototypeOf(readable, ChecksumStream.prototype);
81+
return readable;
82+
};

0 commit comments

Comments
 (0)