Skip to content

Commit 311050e

Browse files
benjamingrruyadorno
authored andcommitted
stream: add asIndexedPairs
Add the asIndexedPairs method for readable streams. PR-URL: #41681 Refs: https://github.com/tc39/proposal-iterator-helpers#asindexedpairs Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent ae34900 commit 311050e

File tree

3 files changed

+82
-0
lines changed

3 files changed

+82
-0
lines changed

doc/api/stream.md

+24
Original file line numberDiff line numberDiff line change
@@ -2064,6 +2064,30 @@ import { Readable } from 'stream';
20642064
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
20652065
```
20662066

2067+
### `readable.asIndexedPairs([options])`
2068+
2069+
<!-- YAML
2070+
added: REPLACEME
2071+
-->
2072+
2073+
> Stability: 1 - Experimental
2074+
2075+
* `options` {Object}
2076+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2077+
aborted.
2078+
* Returns: {Readable} a stream of indexed pairs.
2079+
2080+
This method returns a new stream with chunks of the underlying stream paired
2081+
with a counter in the form `[index, chunk]`. The first index value is 0 and it
2082+
increases by 1 for each chunk produced.
2083+
2084+
```mjs
2085+
import { Readable } from 'stream';
2086+
2087+
const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
2088+
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
2089+
```
2090+
20672091
### Duplex and transform streams
20682092

20692093
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

+11
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,16 @@ async function * map(fn, options) {
157157
}
158158
}
159159

160+
async function* asIndexedPairs(options) {
161+
let index = 0;
162+
for await (const val of this) {
163+
if (options?.signal?.aborted) {
164+
throw new AbortError({ cause: options.signal.reason });
165+
}
166+
yield [index++, val];
167+
}
168+
}
169+
160170
async function some(fn, options) {
161171
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
162172
// Note that some does short circuit but also closes the iterator if it does
@@ -286,6 +296,7 @@ function take(number, options) {
286296
}
287297

288298
module.exports.streamReturningOperators = {
299+
asIndexedPairs,
289300
drop,
290301
filter,
291302
flatMap,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import '../common/index.mjs';
2+
import { Readable } from 'stream';
3+
import { deepStrictEqual, rejects } from 'assert';
4+
5+
{
6+
// asIndexedPairs with a synchronous stream
7+
const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray();
8+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
9+
const empty = await Readable.from([]).asIndexedPairs().toArray();
10+
deepStrictEqual(empty, []);
11+
}
12+
13+
{
14+
// asIndexedPairs works an asynchronous streams
15+
const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x);
16+
const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray();
17+
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
18+
const empty = await asyncFrom([]).asIndexedPairs().toArray();
19+
deepStrictEqual(empty, []);
20+
}
21+
22+
{
23+
// Does not enumerate an infinite stream
24+
const infinite = () => Readable.from(async function* () {
25+
while (true) yield 1;
26+
}());
27+
const pairs = await infinite().asIndexedPairs().take(3).toArray();
28+
deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]);
29+
const empty = await infinite().asIndexedPairs().take(0).toArray();
30+
deepStrictEqual(empty, []);
31+
}
32+
33+
{
34+
// AbortSignal
35+
await rejects(async () => {
36+
const ac = new AbortController();
37+
const { signal } = ac;
38+
const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
39+
ac.abort();
40+
await p;
41+
}, { name: 'AbortError' });
42+
43+
await rejects(async () => {
44+
const signal = AbortSignal.abort();
45+
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
46+
}, /AbortError/);
47+
}

0 commit comments

Comments
 (0)