Skip to content

Commit a05b832

Browse files
iMosesruyadorno
authored andcommitted
stream: use synchronous error validation on iteration helpers
is no longer a generator function, instead it returns a called generator so that validation can be synchronous and not wait for the first iteration Fixes: #41648 PR-URL: #41652 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent 351ebf2 commit a05b832

File tree

4 files changed

+119
-141
lines changed

4 files changed

+119
-141
lines changed

lib/internal/streams/operators.js

+104-99
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const {
2626
const kEmpty = Symbol('kEmpty');
2727
const kEof = Symbol('kEof');
2828

29-
async function * map(fn, options) {
29+
function map(fn, options) {
3030
if (typeof fn !== 'function') {
3131
throw new ERR_INVALID_ARG_TYPE(
3232
'fn', ['Function', 'AsyncFunction'], fn);
@@ -43,118 +43,120 @@ async function * map(fn, options) {
4343

4444
validateInteger(concurrency, 'concurrency', 1);
4545

46-
const ac = new AbortController();
47-
const stream = this;
48-
const queue = [];
49-
const signal = ac.signal;
50-
const signalOpt = { signal };
51-
52-
const abort = () => ac.abort();
53-
if (options?.signal?.aborted) {
54-
abort();
55-
}
56-
57-
options?.signal?.addEventListener('abort', abort);
58-
59-
let next;
60-
let resume;
61-
let done = false;
62-
63-
function onDone() {
64-
done = true;
65-
}
46+
return async function* map() {
47+
const ac = new AbortController();
48+
const stream = this;
49+
const queue = [];
50+
const signal = ac.signal;
51+
const signalOpt = { signal };
6652

67-
async function pump() {
68-
try {
69-
for await (let val of stream) {
70-
if (done) {
71-
return;
72-
}
53+
const abort = () => ac.abort();
54+
if (options?.signal?.aborted) {
55+
abort();
56+
}
7357

74-
if (signal.aborted) {
75-
throw new AbortError();
76-
}
58+
options?.signal?.addEventListener('abort', abort);
7759

78-
try {
79-
val = fn(val, signalOpt);
80-
} catch (err) {
81-
val = PromiseReject(err);
82-
}
60+
let next;
61+
let resume;
62+
let done = false;
8363

84-
if (val === kEmpty) {
85-
continue;
86-
}
64+
function onDone() {
65+
done = true;
66+
}
8767

88-
if (typeof val?.catch === 'function') {
89-
val.catch(onDone);
68+
async function pump() {
69+
try {
70+
for await (let val of stream) {
71+
if (done) {
72+
return;
73+
}
74+
75+
if (signal.aborted) {
76+
throw new AbortError();
77+
}
78+
79+
try {
80+
val = fn(val, signalOpt);
81+
} catch (err) {
82+
val = PromiseReject(err);
83+
}
84+
85+
if (val === kEmpty) {
86+
continue;
87+
}
88+
89+
if (typeof val?.catch === 'function') {
90+
val.catch(onDone);
91+
}
92+
93+
queue.push(val);
94+
if (next) {
95+
next();
96+
next = null;
97+
}
98+
99+
if (!done && queue.length && queue.length >= concurrency) {
100+
await new Promise((resolve) => {
101+
resume = resolve;
102+
});
103+
}
90104
}
91-
105+
queue.push(kEof);
106+
} catch (err) {
107+
const val = PromiseReject(err);
108+
PromisePrototypeCatch(val, onDone);
92109
queue.push(val);
110+
} finally {
111+
done = true;
93112
if (next) {
94113
next();
95114
next = null;
96115
}
97-
98-
if (!done && queue.length && queue.length >= concurrency) {
99-
await new Promise((resolve) => {
100-
resume = resolve;
101-
});
102-
}
103-
}
104-
queue.push(kEof);
105-
} catch (err) {
106-
const val = PromiseReject(err);
107-
PromisePrototypeCatch(val, onDone);
108-
queue.push(val);
109-
} finally {
110-
done = true;
111-
if (next) {
112-
next();
113-
next = null;
116+
options?.signal?.removeEventListener('abort', abort);
114117
}
115-
options?.signal?.removeEventListener('abort', abort);
116118
}
117-
}
118-
119-
pump();
120-
121-
try {
122-
while (true) {
123-
while (queue.length > 0) {
124-
const val = await queue[0];
125-
126-
if (val === kEof) {
127-
return;
128-
}
129119

130-
if (signal.aborted) {
131-
throw new AbortError();
132-
}
120+
pump();
133121

134-
if (val !== kEmpty) {
135-
yield val;
122+
try {
123+
while (true) {
124+
while (queue.length > 0) {
125+
const val = await queue[0];
126+
127+
if (val === kEof) {
128+
return;
129+
}
130+
131+
if (signal.aborted) {
132+
throw new AbortError();
133+
}
134+
135+
if (val !== kEmpty) {
136+
yield val;
137+
}
138+
139+
queue.shift();
140+
if (resume) {
141+
resume();
142+
resume = null;
143+
}
136144
}
137145

138-
queue.shift();
139-
if (resume) {
140-
resume();
141-
resume = null;
142-
}
146+
await new Promise((resolve) => {
147+
next = resolve;
148+
});
143149
}
150+
} finally {
151+
ac.abort();
144152

145-
await new Promise((resolve) => {
146-
next = resolve;
147-
});
148-
}
149-
} finally {
150-
ac.abort();
151-
152-
done = true;
153-
if (resume) {
154-
resume();
155-
resume = null;
153+
done = true;
154+
if (resume) {
155+
resume();
156+
resume = null;
157+
}
156158
}
157-
}
159+
}.call(this);
158160
}
159161

160162
async function* asIndexedPairs(options) {
@@ -214,7 +216,7 @@ async function forEach(fn, options) {
214216
for await (const unused of this.map(forEachFn, options));
215217
}
216218

217-
async function * filter(fn, options) {
219+
function filter(fn, options) {
218220
if (typeof fn !== 'function') {
219221
throw new ERR_INVALID_ARG_TYPE(
220222
'fn', ['Function', 'AsyncFunction'], fn);
@@ -225,7 +227,7 @@ async function * filter(fn, options) {
225227
}
226228
return kEmpty;
227229
}
228-
yield* this.map(filterFn, options);
230+
return this.map(filterFn, options);
229231
}
230232

231233
async function toArray(options) {
@@ -239,10 +241,13 @@ async function toArray(options) {
239241
return result;
240242
}
241243

242-
async function* flatMap(fn, options) {
243-
for await (const val of this.map(fn, options)) {
244-
yield* val;
245-
}
244+
function flatMap(fn, options) {
245+
const values = this.map(fn, options);
246+
return async function* flatMap() {
247+
for await (const val of values) {
248+
yield* val;
249+
}
250+
}.call(this);
246251
}
247252

248253
function toIntegerOrInfinity(number) {

test/parallel/test-stream-filter.js

+5-14
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,11 @@ const { setTimeout } = require('timers/promises');
8787

8888
{
8989
// Error cases
90-
assert.rejects(async () => {
91-
// eslint-disable-next-line no-unused-vars
92-
for await (const unused of Readable.from([1]).filter(1));
93-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94-
assert.rejects(async () => {
95-
// eslint-disable-next-line no-unused-vars
96-
for await (const _ of Readable.from([1]).filter((x) => x, {
97-
concurrency: 'Foo'
98-
}));
99-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
100-
assert.rejects(async () => {
101-
// eslint-disable-next-line no-unused-vars
102-
for await (const _ of Readable.from([1]).filter((x) => x, 1));
103-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
assert.throws(() => Readable.from([1]).filter(1), /ERR_INVALID_ARG_TYPE/);
91+
assert.throws(() => Readable.from([1]).filter((x) => x, {
92+
concurrency: 'Foo'
93+
}), /ERR_OUT_OF_RANGE/);
94+
assert.throws(() => Readable.from([1]).filter((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
10495
}
10596
{
10697
// Test result is a Readable

test/parallel/test-stream-flatMap.js

+5-14
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,11 @@ function oneTo5() {
109109

110110
{
111111
// Error cases
112-
assert.rejects(async () => {
113-
// eslint-disable-next-line no-unused-vars
114-
for await (const unused of Readable.from([1]).flatMap(1));
115-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
116-
assert.rejects(async () => {
117-
// eslint-disable-next-line no-unused-vars
118-
for await (const _ of Readable.from([1]).flatMap((x) => x, {
119-
concurrency: 'Foo'
120-
}));
121-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
122-
assert.rejects(async () => {
123-
// eslint-disable-next-line no-unused-vars
124-
for await (const _ of Readable.from([1]).flatMap((x) => x, 1));
125-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
112+
assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/);
113+
assert.throws(() => Readable.from([1]).flatMap((x) => x, {
114+
concurrency: 'Foo'
115+
}), /ERR_OUT_OF_RANGE/);
116+
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
126117
}
127118
{
128119
// Test result is a Readable

test/parallel/test-stream-map.js

+5-14
Original file line numberDiff line numberDiff line change
@@ -175,20 +175,11 @@ const { setTimeout } = require('timers/promises');
175175

176176
{
177177
// Error cases
178-
assert.rejects(async () => {
179-
// eslint-disable-next-line no-unused-vars
180-
for await (const unused of Readable.from([1]).map(1));
181-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
182-
assert.rejects(async () => {
183-
// eslint-disable-next-line no-unused-vars
184-
for await (const _ of Readable.from([1]).map((x) => x, {
185-
concurrency: 'Foo'
186-
}));
187-
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
188-
assert.rejects(async () => {
189-
// eslint-disable-next-line no-unused-vars
190-
for await (const _ of Readable.from([1]).map((x) => x, 1));
191-
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
178+
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
179+
assert.throws(() => Readable.from([1]).map((x) => x, {
180+
concurrency: 'Foo'
181+
}), /ERR_OUT_OF_RANGE/);
182+
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
192183
}
193184
{
194185
// Test result is a Readable

0 commit comments

Comments
 (0)