Skip to content

Commit 62e1a68

Browse files
benjamingrronag
authored andcommitted
stream: add toArray
Add the toArray method from the TC39 iterator helper proposal to Readable streams. This also enables a common-use case of converting a stream to an array. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41553 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent 6de8e51 commit 62e1a68

File tree

3 files changed

+136
-0
lines changed

3 files changed

+136
-0
lines changed

doc/api/stream.md

+40
Original file line numberDiff line numberDiff line change
@@ -1835,6 +1835,46 @@ await dnsResults.forEach((result) => {
18351835
console.log('done'); // Stream has finished
18361836
```
18371837

1838+
### `readable.toArray([options])`
1839+
1840+
<!-- YAML
1841+
added: REPLACEME
1842+
-->
1843+
1844+
> Stability: 1 - Experimental
1845+
1846+
* `options` {Object}
1847+
* `signal` {AbortSignal} allows cancelling the toArray operation if the
1848+
signal is aborted.
1849+
* Returns: {Promise} a promise containing an array (if the stream is in
1850+
object mode) or Buffer with the contents of the stream.
1851+
1852+
This method allows easily obtaining the contents of a stream. If the
1853+
stream is in [object mode][object-mode] an array of its contents is returned.
1854+
If the stream is not in object mode a Buffer containing its data is returned.
1855+
1856+
As this method reads the entire stream into memory, it negates the benefits of
1857+
streams. It's intended for interoperability and convenience, not as the primary
1858+
way to consume streams.
1859+
1860+
```mjs
1861+
import { Readable } from 'stream';
1862+
import { Resolver } from 'dns/promises';
1863+
1864+
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
1865+
1866+
// Make dns queries concurrently using .map and collect
1867+
// the results into an aray using toArray
1868+
const dnsResults = await Readable.from([
1869+
'nodejs.org',
1870+
'openjsf.org',
1871+
'www.linuxfoundation.org',
1872+
]).map(async (domain) => {
1873+
const { address } = await resolver.resolve4(domain, { ttl: true });
1874+
return address;
1875+
}, { concurrency: 2 }).toArray();
1876+
```
1877+
18381878
### Duplex and transform streams
18391879

18401880
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict';
22

33
const { AbortController } = require('internal/abort_controller');
4+
const { Buffer } = require('buffer');
5+
46
const {
57
codes: {
68
ERR_INVALID_ARG_TYPE,
@@ -10,6 +12,7 @@ const {
1012
const { validateInteger } = require('internal/validators');
1113

1214
const {
15+
ArrayPrototypePush,
1316
MathFloor,
1417
Promise,
1518
PromiseReject,
@@ -174,11 +177,25 @@ async function * filter(fn, options) {
174177
yield* this.map(filterFn, options);
175178
}
176179

180+
async function toArray(options) {
181+
const result = [];
182+
for await (const val of this) {
183+
if (options?.signal?.aborted) {
184+
throw new AbortError({ cause: options.signal.reason });
185+
}
186+
ArrayPrototypePush(result, val);
187+
}
188+
if (!this.readableObjectMode) {
189+
return Buffer.concat(result);
190+
}
191+
return result;
192+
}
177193
module.exports.streamReturningOperators = {
178194
filter,
179195
map,
180196
};
181197

182198
module.exports.promiseReturningOperators = {
183199
forEach,
200+
toArray,
184201
};

test/parallel/test-stream-toArray.js

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
{
10+
// Works on a synchronous stream
11+
(async () => {
12+
const tests = [
13+
[],
14+
[1],
15+
[1, 2, 3],
16+
Array(100).fill().map((_, i) => i),
17+
];
18+
for (const test of tests) {
19+
const stream = Readable.from(test);
20+
const result = await stream.toArray();
21+
assert.deepStrictEqual(result, test);
22+
}
23+
})().then(common.mustCall());
24+
}
25+
26+
{
27+
// Works on a non-object-mode stream and flattens it
28+
(async () => {
29+
const stream = Readable.from(
30+
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
31+
, { objectMode: false });
32+
const result = await stream.toArray();
33+
assert.strictEqual(Buffer.isBuffer(result), true);
34+
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
35+
})().then(common.mustCall());
36+
}
37+
38+
{
39+
// Works on an asynchronous stream
40+
(async () => {
41+
const tests = [
42+
[],
43+
[1],
44+
[1, 2, 3],
45+
Array(100).fill().map((_, i) => i),
46+
];
47+
for (const test of tests) {
48+
const stream = Readable.from(test).map((x) => Promise.resolve(x));
49+
const result = await stream.toArray();
50+
assert.deepStrictEqual(result, test);
51+
}
52+
})().then(common.mustCall());
53+
}
54+
55+
{
56+
// Support for AbortSignal
57+
const ac = new AbortController();
58+
let stream;
59+
assert.rejects(async () => {
60+
stream = Readable.from([1, 2, 3]).map(async (x) => {
61+
if (x === 3) {
62+
await new Promise(() => {}); // Explicitly do not pass signal here
63+
}
64+
return Promise.resolve(x);
65+
});
66+
await stream.toArray({ signal: ac.signal });
67+
}, {
68+
name: 'AbortError',
69+
}).then(common.mustCall(() => {
70+
// Only stops toArray, does not destory the stream
71+
assert(stream.destroyed, false);
72+
}));
73+
ac.abort();
74+
}
75+
{
76+
// Test result is a Promise
77+
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
78+
assert.strictEqual(result instanceof Promise, true);
79+
}

0 commit comments

Comments
 (0)