Skip to content

Commit 5c60662

Browse files
committed
fix: addressing PR feedback
1 parent 3ce86b0 commit 5c60662

File tree

9 files changed

+56
-38
lines changed

9 files changed

+56
-38
lines changed

lib/storage/src/Upload.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,11 @@ export class Upload extends EventEmitter {
5858
this.leavePartsOnError = options.leavePartsOnError || this.leavePartsOnError;
5959
this.tags = options.tags || this.tags;
6060

61-
if (this.partSize < MIN_PART_SIZE) {
62-
throw new Error(
63-
`EntityTooSmall: Your proposed upload partsize [${this.partSize}] is smaller than the minimum allowed size [${MIN_PART_SIZE}] (5MB)`
64-
);
65-
}
66-
67-
if (this.queueSize < 1) {
68-
throw new Error(`Queue size: Must have atleast one uploading queue.`);
69-
}
70-
7161
this.client = options.client;
7262
this.params = options.params;
7363

64+
this.__validateInput();
65+
7466
// set progress defaults
7567
this.totalBytes = byteLength(this.params.Body);
7668
this.bytesUploadedSoFar = 0;
@@ -94,7 +86,7 @@ export class Upload extends EventEmitter {
9486
super.on(event, listener);
9587
}
9688

97-
async __doConcurrentUpload(dataFeeder: AsyncGenerator<RawDataPart>): Promise<void> {
89+
async __doConcurrentUpload(dataFeeder: AsyncGenerator<RawDataPart, void, undefined>): Promise<void> {
9890
for await (const dataPart of dataFeeder) {
9991
if (this.uploadedParts.length > this.MAX_PARTS) {
10092
throw new Error(
@@ -199,4 +191,24 @@ export class Upload extends EventEmitter {
199191
};
200192
});
201193
}
194+
195+
__validateInput() {
196+
if (!this.params) {
197+
throw new Error(`InputError: Upload requires params to be passed to upload.`);
198+
}
199+
200+
if (!this.client) {
201+
throw new Error(`InputError: Upload requires a AWS client to do uploads with.`);
202+
}
203+
204+
if (this.partSize < MIN_PART_SIZE) {
205+
throw new Error(
206+
`EntityTooSmall: Your proposed upload partsize [${this.partSize}] is smaller than the minimum allowed size [${MIN_PART_SIZE}] (5MB)`
207+
);
208+
}
209+
210+
if (this.queueSize < 1) {
211+
throw new Error(`Queue size: Must have at least one uploading queue.`);
212+
}
213+
}
202214
}

lib/storage/src/chunker.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,25 @@ import stream from "stream";
22
import { Buffer } from "buffer";
33

44
import { BodyDataTypes } from "./types";
5-
import { getChunk as getChunkBuffer } from "./chunks/getChunkBuffer";
6-
import { getChunk as getChunkStream } from "./chunks/getChunkStream";
7-
import { getNextData as getNextDataReadableStream } from "./chunks/getDataReadableStream";
8-
import { getNextData as getNextDataReadable } from "./chunks/getDataReadable";
5+
import { getChunkBuffer } from "./chunks/getChunkBuffer";
6+
import { getChunkStream } from "./chunks/getChunkStream";
7+
import { getDataReadableStream } from "./chunks/getDataReadableStream";
8+
import { getDataReadable } from "./chunks/getDataReadable";
99

1010
export const getChunk = (data: BodyDataTypes, partSize: number) => {
1111
if (data instanceof Buffer) {
1212
return getChunkBuffer(data, partSize);
1313
} else if (data instanceof stream.Readable) {
14-
return getChunkStream<stream.Readable>(data, partSize, getNextDataReadable);
15-
} else if (data instanceof String || typeof data == "string" || data instanceof Uint8Array) {
14+
return getChunkStream<stream.Readable>(data, partSize, getDataReadable);
15+
} else if (data instanceof String || typeof data === "string" || data instanceof Uint8Array) {
1616
// chunk Strings, Uint8Array.
1717
return getChunkBuffer(Buffer.from(data), partSize);
1818
}
1919
if (typeof (data as any).stream === "function") {
2020
// approximate support for Blobs.
21-
return getChunkStream<ReadableStream>((data as any).stream(), partSize, getNextDataReadableStream);
21+
return getChunkStream<ReadableStream>((data as any).stream(), partSize, getDataReadableStream);
2222
} else if (data instanceof ReadableStream) {
23-
return getChunkStream<ReadableStream>(data, partSize, getNextDataReadableStream);
23+
return getChunkStream<ReadableStream>(data, partSize, getDataReadableStream);
2424
} else {
2525
throw new Error(
2626
"Body Data is unsupported format, expected data to be one of: string | Uint8Array | Buffer | Readable | ReadableStream | Blob;."

lib/storage/src/chunks/getChunkBuffer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { RawDataPart } from "../Upload";
22

3-
export async function* getChunk(data: Buffer, partSize: number): AsyncGenerator<RawDataPart> {
3+
export async function* getChunkBuffer(data: Buffer, partSize: number): AsyncGenerator<RawDataPart, void, undefined> {
44
let partNumber = 1;
55
let startByte = 0;
66
let endByte = startByte + partSize;

lib/storage/src/chunks/getChunkStream.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ interface Buffers {
66
length: number;
77
}
88

9-
export async function* getChunk<T>(
9+
export async function* getChunkStream<T>(
1010
data: T,
1111
partSize: number,
1212
getNextData: (data: T) => AsyncGenerator<Buffer>
13-
): AsyncGenerator<RawDataPart> {
13+
): AsyncGenerator<RawDataPart, void, undefined> {
1414
let partNumber = 1;
1515
const currentBuffer: Buffers = { chunks: [], length: 0 };
1616

@@ -19,9 +19,13 @@ export async function* getChunk<T>(
1919
currentBuffer.length += datum.length;
2020

2121
while (currentBuffer.length >= partSize) {
22-
// Concat all the buffers together once.
2322
let dataChunk: Buffer = currentBuffer.chunks[0];
24-
if(currentBuffer.chunks.length) {
23+
24+
/**
25+
* Concat all the buffers together once if there is more than one to concat. Attempt
26+
* to minimize concats as Buffer.Concat is an extremely expensive operation.
27+
*/
28+
if (currentBuffer.chunks.length > 1) {
2529
dataChunk = Buffer.concat(currentBuffer.chunks);
2630
}
2731

lib/storage/src/chunks/getDataReadable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Stream from "stream";
22
import { Buffer } from "buffer";
33

4-
export async function* getNextData(data: Stream.Readable): AsyncGenerator<Buffer> {
4+
export async function* getDataReadable(data: Stream.Readable): AsyncGenerator<Buffer> {
55
for await (const chunk of data) {
66
yield Buffer.from(chunk);
77
}

lib/storage/src/chunks/getDataReadableStream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Buffer } from "buffer";
22

3-
export async function* getNextData(data: ReadableStream): AsyncGenerator<Buffer> {
3+
export async function* getDataReadableStream(data: ReadableStream): AsyncGenerator<Buffer> {
44
// Get a lock on the stream.
55
const reader = data.getReader();
66

@@ -13,6 +13,8 @@ export async function* getNextData(data: ReadableStream): AsyncGenerator<Buffer>
1313
// Else yield the chunk.
1414
yield Buffer.from(value);
1515
}
16+
} catch(e) {
17+
throw e;
1618
} finally {
1719
// release the lock for reading from this stream.
1820
reader.releaseLock();

lib/storage/test/data-chunk/buffer-chunk.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import { byteLength } from "../../src/bytelength";
2-
import { getChunk as getChunkFromBuffer } from "../../src/chunks/getChunkBuffer";
2+
import { getChunkBuffer } from "../../src/chunks/getChunkBuffer";
33

4-
describe.only(getChunkFromBuffer.name, () => {
4+
describe.only(getChunkBuffer.name, () => {
55
const getBuffer = (size: number) => Buffer.from("#".repeat(size));
66

77
describe("Buffer chunking", () => {
88
it("should come back with small sub buffers", async (done) => {
99
const chunklength = 100;
1010
const totalLength = 1000;
1111
const buffer = getBuffer(totalLength);
12-
const chunker = getChunkFromBuffer(buffer, chunklength);
12+
const chunker = getChunkBuffer(buffer, chunklength);
1313

1414
let chunkNum = 0;
1515
for await (const chunk of chunker) {
@@ -27,7 +27,7 @@ describe.only(getChunkFromBuffer.name, () => {
2727
const totalLength = 2200;
2828
const buffer = getBuffer(totalLength);
2929

30-
const chunker = getChunkFromBuffer(buffer, chunklength);
30+
const chunker = getChunkBuffer(buffer, chunklength);
3131
const chunks = [];
3232
for await (const chunk of chunker) {
3333
chunks.push(chunk);
@@ -45,7 +45,7 @@ describe.only(getChunkFromBuffer.name, () => {
4545
const totalLength = 200;
4646
const buffer = getBuffer(totalLength);
4747

48-
const chunker = getChunkFromBuffer(buffer, chunklength);
48+
const chunker = getChunkBuffer(buffer, chunklength);
4949
const chunks = [];
5050
for await (const chunk of chunker) {
5151
chunks.push(chunk);

lib/storage/test/data-chunk/readable-chunk.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Readable } from "stream";
22
import { byteLength } from "../../src/bytelength";
3-
import { getChunk as chunkFromReadable } from "../../src/chunks/getChunkStream";
4-
import { getNextData } from "../../src/chunks/getDataReadable";
3+
import { getChunkStream as chunkFromReadable } from "../../src/chunks/getChunkStream";
4+
import { getDataReadable } from "../../src/chunks/getDataReadable";
55
import { RawDataPart as DataPart } from "../../src/Upload";
66

77
const fs = require("fs");
@@ -23,7 +23,7 @@ describe(chunkFromReadable.name, () => {
2323
): Promise<DataPart[]> => {
2424
const stream = Readable.from(getUnknownEnding(streamYieldSize, streamYieldCount));
2525
const chunks: DataPart[] = [];
26-
const chunker = chunkFromReadable<Readable>(stream, partsize, getNextData);
26+
const chunker = chunkFromReadable<Readable>(stream, partsize, getDataReadable);
2727

2828
for await (const chunk of chunker) {
2929
chunks.push(chunk);
@@ -49,7 +49,7 @@ describe(chunkFromReadable.name, () => {
4949
it("should properly chunk a file", async (done) => {
5050
const fileStream = fs.createReadStream(__dirname + "/sample.file");
5151
const chunks: DataPart[] = [];
52-
const chunker = chunkFromReadable<Readable>(fileStream, _6MB, getNextData);
52+
const chunker = chunkFromReadable<Readable>(fileStream, _6MB, getDataReadable);
5353
for await (const chunk of chunker) {
5454
chunks.push(chunk);
5555
}

lib/storage/test/data-chunk/readable-stream-chunk.spec.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
import { ReadableStream } from "web-streams-polyfill";
33

44
import { byteLength } from "../../src/bytelength";
5-
import { getChunk as chunkFromReadable } from "../../src/chunks/getChunkStream";
6-
import { getNextData } from "../../src/chunks/getDataReadableStream";
5+
import { getChunkStream as chunkFromReadable } from "../../src/chunks/getChunkStream";
6+
import { getDataReadableStream } from "../../src/chunks/getDataReadableStream";
77
import { RawDataPart as DataPart } from "../../src/Upload";
88

99
describe("chunkFromReadable.name", () => {
@@ -27,7 +27,7 @@ describe("chunkFromReadable.name", () => {
2727
): Promise<DataPart[]> => {
2828
const stream = getStreamOfUnknownlength(streamYieldSize, streamYieldCount);
2929
const chunks: DataPart[] = [];
30-
const chunker = chunkFromReadable<ReadableStream>(stream, partsize, getNextData);
30+
const chunker = chunkFromReadable<ReadableStream>(stream, partsize, getDataReadableStream);
3131

3232
for await (const chunk of chunker) {
3333
chunks.push(chunk);

0 commit comments

Comments
 (0)