Skip to content

Commit 4cdc08a

Browse files
authored
fix(lib-storage): chunk from readable only when defined (#1886)
* fix: fix readable chunking to work with node 12+ readable interface * fix: adding new line * chore(lib-storage): style updates to readable-helper
1 parent 741bb99 commit 4cdc08a

File tree

1 file changed

+17
-21
lines changed

1 file changed

+17
-21
lines changed

Diff for: lib/storage/src/data-chunk/readable-helper.ts

+17-21
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,25 @@ import { Readable } from "stream";
44
import { DEFAULT } from "../upload/defaults";
55
import { DataPart } from "./yield-chunk";
66

7-
interface StreamChunk {
8-
Body: Buffer;
9-
ended: boolean;
10-
}
11-
127
export async function* chunkFromReadable(reader: Readable, chunkSize: number): AsyncGenerator<DataPart, void, unknown> {
138
let partNumber = DEFAULT.MIN_PART_NUMBER;
149
let oldBuffer = Buffer.from("");
1510
while (partNumber < DEFAULT.MAX_PART_NUMBER) {
16-
reader.resume();
17-
const result = await _chunkFromStream(reader, chunkSize, oldBuffer);
18-
reader.pause();
11+
let currentBuffer = oldBuffer;
12+
if (reader.readable) {
13+
reader.resume();
14+
currentBuffer = await _chunkFromStream(reader, chunkSize, oldBuffer);
15+
reader.pause();
16+
}
1917

2018
yield {
21-
Body: result.Body.slice(0, chunkSize),
19+
Body: currentBuffer.slice(0, chunkSize),
2220
PartNumber: partNumber,
2321
};
24-
oldBuffer = result.Body.slice(chunkSize) as Buffer;
22+
oldBuffer = currentBuffer.slice(chunkSize) as Buffer;
2523
partNumber += 1;
2624

27-
if (result.ended && oldBuffer.length == 0) {
25+
if (!reader.readable && oldBuffer.length == 0) {
2826
return;
2927
}
3028
}
@@ -33,7 +31,11 @@ export async function* chunkFromReadable(reader: Readable, chunkSize: number): A
3331
}
3432
}
3533

36-
function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<StreamChunk> {
34+
function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer): Promise<Buffer> {
35+
if (!stream.readable) {
36+
return Promise.resolve(oldBuffer);
37+
}
38+
3739
let currentChunk = oldBuffer;
3840
return new Promise((resolve, reject) => {
3941
const cleanupListeners = () => {
@@ -44,12 +46,9 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer
4446

4547
stream.on("data", (chunk) => {
4648
currentChunk = Buffer.concat([currentChunk, Buffer.from(chunk)]);
47-
if (currentChunk.length >= chunkSize) {
49+
if (currentChunk.length >= chunkSize || !stream.readable) {
4850
cleanupListeners();
49-
resolve({
50-
Body: currentChunk,
51-
ended: false,
52-
});
51+
resolve(currentChunk);
5352
}
5453
});
5554
stream.on("error", (err) => {
@@ -58,10 +57,7 @@ function _chunkFromStream(stream: Readable, chunkSize: number, oldBuffer: Buffer
5857
});
5958
stream.on("end", () => {
6059
cleanupListeners();
61-
resolve({
62-
Body: currentChunk,
63-
ended: true,
64-
});
60+
resolve(currentChunk);
6561
});
6662
});
6763
}

0 commit comments

Comments
 (0)