Skip to content

Commit e11a079

Browse files
iMosesruyadorno
authored andcommitted
stream: use synchronous error validation & validate abort signal option
made sure top level methods aren't async/generators so that validation errors could be caught synchronously also added validation for the abort signal option PR-URL: #41777 Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Nitzan Uziely <[email protected]>
1 parent 4db343b commit e11a079

8 files changed

+98
-11
lines changed

lib/internal/streams/operators.js

+58-10
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ const {
1010
},
1111
AbortError,
1212
} = require('internal/errors');
13-
const { validateInteger } = require('internal/validators');
13+
const {
14+
validateAbortSignal,
15+
validateInteger,
16+
} = require('internal/validators');
1417
const { kWeakHandler } = require('internal/event_target');
1518
const { finished } = require('internal/streams/end-of-stream');
1619

@@ -33,10 +36,12 @@ function map(fn, options) {
3336
throw new ERR_INVALID_ARG_TYPE(
3437
'fn', ['Function', 'AsyncFunction'], fn);
3538
}
36-
3739
if (options != null && typeof options !== 'object') {
3840
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
3941
}
42+
if (options?.signal != null) {
43+
validateAbortSignal(options.signal, 'options.signal');
44+
}
4045

4146
let concurrency = 1;
4247
if (options?.concurrency != null) {
@@ -161,17 +166,33 @@ function map(fn, options) {
161166
}.call(this);
162167
}
163168

164-
async function* asIndexedPairs(options) {
165-
let index = 0;
166-
for await (const val of this) {
167-
if (options?.signal?.aborted) {
168-
throw new AbortError({ cause: options.signal.reason });
169-
}
170-
yield [index++, val];
169+
function asIndexedPairs(options) {
170+
if (options != null && typeof options !== 'object') {
171+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
172+
}
173+
if (options?.signal != null) {
174+
validateAbortSignal(options.signal, 'options.signal');
171175
}
176+
177+
return async function* asIndexedPairs() {
178+
let index = 0;
179+
for await (const val of this) {
180+
if (options?.signal?.aborted) {
181+
throw new AbortError({ cause: options.signal.reason });
182+
}
183+
yield [index++, val];
184+
}
185+
}.call(this);
172186
}
173187

174188
async function some(fn, options) {
189+
if (options != null && typeof options !== 'object') {
190+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
191+
}
192+
if (options?.signal != null) {
193+
validateAbortSignal(options.signal, 'options.signal');
194+
}
195+
175196
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
176197
// Note that some does short circuit but also closes the iterator if it does
177198
const ac = new AbortController();
@@ -246,6 +267,13 @@ async function reduce(reducer, initialValue, options) {
246267
throw new ERR_INVALID_ARG_TYPE(
247268
'reducer', ['Function', 'AsyncFunction'], reducer);
248269
}
270+
if (options != null && typeof options !== 'object') {
271+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
272+
}
273+
if (options?.signal != null) {
274+
validateAbortSignal(options.signal, 'options.signal');
275+
}
276+
249277
let hasInitialValue = arguments.length > 1;
250278
if (options?.signal?.aborted) {
251279
const err = new AbortError(undefined, { cause: options.signal.reason });
@@ -283,6 +311,13 @@ async function reduce(reducer, initialValue, options) {
283311
}
284312

285313
async function toArray(options) {
314+
if (options != null && typeof options !== 'object') {
315+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
316+
}
317+
if (options?.signal != null) {
318+
validateAbortSignal(options.signal, 'options.signal');
319+
}
320+
286321
const result = [];
287322
for await (const val of this) {
288323
if (options?.signal?.aborted) {
@@ -316,6 +351,13 @@ function toIntegerOrInfinity(number) {
316351
}
317352

318353
function drop(number, options) {
354+
if (options != null && typeof options !== 'object') {
355+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
356+
}
357+
if (options?.signal != null) {
358+
validateAbortSignal(options.signal, 'options.signal');
359+
}
360+
319361
number = toIntegerOrInfinity(number);
320362
return async function* drop() {
321363
if (options?.signal?.aborted) {
@@ -332,8 +374,14 @@ function drop(number, options) {
332374
}.call(this);
333375
}
334376

335-
336377
function take(number, options) {
378+
if (options != null && typeof options !== 'object') {
379+
throw new ERR_INVALID_ARG_TYPE('options', ['Object']);
380+
}
381+
if (options?.signal != null) {
382+
validateAbortSignal(options.signal, 'options.signal');
383+
}
384+
337385
number = toIntegerOrInfinity(number);
338386
return async function* take() {
339387
if (options?.signal?.aborted) {

test/parallel/test-stream-asIndexedPairs.mjs

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import '../common/index.mjs';
22
import { Readable } from 'stream';
3-
import { deepStrictEqual, rejects } from 'assert';
3+
import { deepStrictEqual, rejects, throws } from 'assert';
44

55
{
66
// asIndexedPairs with a synchronous stream
@@ -45,3 +45,9 @@ import { deepStrictEqual, rejects } from 'assert';
4545
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
4646
}, /AbortError/);
4747
}
48+
49+
{
50+
// Error cases
51+
throws(() => Readable.from([1]).asIndexedPairs(1), /ERR_INVALID_ARG_TYPE/);
52+
throws(() => Readable.from([1]).asIndexedPairs({ signal: true }), /ERR_INVALID_ARG_TYPE/);
53+
}

test/parallel/test-stream-drop-take.js

+6
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,10 @@ const naturals = () => from(async function*() {
9393
for (const example of invalidArgs) {
9494
throws(() => from([]).take(example).toArray(), /ERR_OUT_OF_RANGE/);
9595
}
96+
97+
throws(() => Readable.from([1]).drop(1, 1), /ERR_INVALID_ARG_TYPE/);
98+
throws(() => Readable.from([1]).drop(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
99+
100+
throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
101+
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
96102
}

test/parallel/test-stream-flatMap.js

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ function oneTo5() {
114114
concurrency: 'Foo'
115115
}), /ERR_OUT_OF_RANGE/);
116116
assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
117+
assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
117118
}
118119
{
119120
// Test result is a Readable

test/parallel/test-stream-map.js

+1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ const { setTimeout } = require('timers/promises');
180180
concurrency: 'Foo'
181181
}), /ERR_OUT_OF_RANGE/);
182182
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183+
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
183184
}
184185
{
185186
// Test result is a Readable

test/parallel/test-stream-reduce.js

+2
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ function sum(p, c) {
121121
// Error cases
122122
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123123
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124+
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
125+
assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
124126
}
125127

126128
{

test/parallel/test-stream-some-every.js

+11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ function oneTo5Async() {
8787
assert.rejects(async () => {
8888
await Readable.from([1]).every(1);
8989
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
90+
91+
assert.rejects(async () => {
92+
await Readable.from([1]).every((x) => x, 1);
93+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
94+
95+
assert.rejects(async () => {
96+
await Readable.from([1]).every((x) => x, {
97+
signal: true
98+
});
99+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
100+
90101
assert.rejects(async () => {
91102
await Readable.from([1]).every((x) => x, {
92103
concurrency: 'Foo'

test/parallel/test-stream-toArray.js

+12
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,15 @@ const assert = require('assert');
7979
const result = Readable.from([1, 2, 3, 4, 5]).toArray();
8080
assert.strictEqual(result instanceof Promise, true);
8181
}
82+
{
83+
// Error cases
84+
assert.rejects(async () => {
85+
await Readable.from([1]).toArray(1);
86+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
87+
88+
assert.rejects(async () => {
89+
await Readable.from([1]).toArray({
90+
signal: true
91+
});
92+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
93+
}

0 commit comments

Comments
 (0)