@@ -26,7 +26,7 @@ const {
26
26
const kEmpty = Symbol ( 'kEmpty' ) ;
27
27
const kEof = Symbol ( 'kEof' ) ;
28
28
29
- async function * map ( fn , options ) {
29
+ function map ( fn , options ) {
30
30
if ( typeof fn !== 'function' ) {
31
31
throw new ERR_INVALID_ARG_TYPE (
32
32
'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -43,118 +43,120 @@ async function * map(fn, options) {
43
43
44
44
validateInteger ( concurrency , 'concurrency' , 1 ) ;
45
45
46
- const ac = new AbortController ( ) ;
47
- const stream = this ;
48
- const queue = [ ] ;
49
- const signal = ac . signal ;
50
- const signalOpt = { signal } ;
51
-
52
- const abort = ( ) => ac . abort ( ) ;
53
- if ( options ?. signal ?. aborted ) {
54
- abort ( ) ;
55
- }
56
-
57
- options ?. signal ?. addEventListener ( 'abort' , abort ) ;
58
-
59
- let next ;
60
- let resume ;
61
- let done = false ;
62
-
63
- function onDone ( ) {
64
- done = true ;
65
- }
46
+ return async function * map ( ) {
47
+ const ac = new AbortController ( ) ;
48
+ const stream = this ;
49
+ const queue = [ ] ;
50
+ const signal = ac . signal ;
51
+ const signalOpt = { signal } ;
66
52
67
- async function pump ( ) {
68
- try {
69
- for await ( let val of stream ) {
70
- if ( done ) {
71
- return ;
72
- }
53
+ const abort = ( ) => ac . abort ( ) ;
54
+ if ( options ?. signal ?. aborted ) {
55
+ abort ( ) ;
56
+ }
73
57
74
- if ( signal . aborted ) {
75
- throw new AbortError ( ) ;
76
- }
58
+ options ?. signal ?. addEventListener ( 'abort' , abort ) ;
77
59
78
- try {
79
- val = fn ( val , signalOpt ) ;
80
- } catch ( err ) {
81
- val = PromiseReject ( err ) ;
82
- }
60
+ let next ;
61
+ let resume ;
62
+ let done = false ;
83
63
84
- if ( val === kEmpty ) {
85
- continue ;
86
- }
64
+ function onDone ( ) {
65
+ done = true ;
66
+ }
87
67
88
- if ( typeof val ?. catch === 'function' ) {
89
- val . catch ( onDone ) ;
68
+ async function pump ( ) {
69
+ try {
70
+ for await ( let val of stream ) {
71
+ if ( done ) {
72
+ return ;
73
+ }
74
+
75
+ if ( signal . aborted ) {
76
+ throw new AbortError ( ) ;
77
+ }
78
+
79
+ try {
80
+ val = fn ( val , signalOpt ) ;
81
+ } catch ( err ) {
82
+ val = PromiseReject ( err ) ;
83
+ }
84
+
85
+ if ( val === kEmpty ) {
86
+ continue ;
87
+ }
88
+
89
+ if ( typeof val ?. catch === 'function' ) {
90
+ val . catch ( onDone ) ;
91
+ }
92
+
93
+ queue . push ( val ) ;
94
+ if ( next ) {
95
+ next ( ) ;
96
+ next = null ;
97
+ }
98
+
99
+ if ( ! done && queue . length && queue . length >= concurrency ) {
100
+ await new Promise ( ( resolve ) => {
101
+ resume = resolve ;
102
+ } ) ;
103
+ }
90
104
}
91
-
105
+ queue . push ( kEof ) ;
106
+ } catch ( err ) {
107
+ const val = PromiseReject ( err ) ;
108
+ PromisePrototypeCatch ( val , onDone ) ;
92
109
queue . push ( val ) ;
110
+ } finally {
111
+ done = true ;
93
112
if ( next ) {
94
113
next ( ) ;
95
114
next = null ;
96
115
}
97
-
98
- if ( ! done && queue . length && queue . length >= concurrency ) {
99
- await new Promise ( ( resolve ) => {
100
- resume = resolve ;
101
- } ) ;
102
- }
103
- }
104
- queue . push ( kEof ) ;
105
- } catch ( err ) {
106
- const val = PromiseReject ( err ) ;
107
- PromisePrototypeCatch ( val , onDone ) ;
108
- queue . push ( val ) ;
109
- } finally {
110
- done = true ;
111
- if ( next ) {
112
- next ( ) ;
113
- next = null ;
116
+ options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
114
117
}
115
- options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
116
118
}
117
- }
118
-
119
- pump ( ) ;
120
-
121
- try {
122
- while ( true ) {
123
- while ( queue . length > 0 ) {
124
- const val = await queue [ 0 ] ;
125
-
126
- if ( val === kEof ) {
127
- return ;
128
- }
129
119
130
- if ( signal . aborted ) {
131
- throw new AbortError ( ) ;
132
- }
120
+ pump ( ) ;
133
121
134
- if ( val !== kEmpty ) {
135
- yield val ;
122
+ try {
123
+ while ( true ) {
124
+ while ( queue . length > 0 ) {
125
+ const val = await queue [ 0 ] ;
126
+
127
+ if ( val === kEof ) {
128
+ return ;
129
+ }
130
+
131
+ if ( signal . aborted ) {
132
+ throw new AbortError ( ) ;
133
+ }
134
+
135
+ if ( val !== kEmpty ) {
136
+ yield val ;
137
+ }
138
+
139
+ queue . shift ( ) ;
140
+ if ( resume ) {
141
+ resume ( ) ;
142
+ resume = null ;
143
+ }
136
144
}
137
145
138
- queue . shift ( ) ;
139
- if ( resume ) {
140
- resume ( ) ;
141
- resume = null ;
142
- }
146
+ await new Promise ( ( resolve ) => {
147
+ next = resolve ;
148
+ } ) ;
143
149
}
150
+ } finally {
151
+ ac . abort ( ) ;
144
152
145
- await new Promise ( ( resolve ) => {
146
- next = resolve ;
147
- } ) ;
148
- }
149
- } finally {
150
- ac . abort ( ) ;
151
-
152
- done = true ;
153
- if ( resume ) {
154
- resume ( ) ;
155
- resume = null ;
153
+ done = true ;
154
+ if ( resume ) {
155
+ resume ( ) ;
156
+ resume = null ;
157
+ }
156
158
}
157
- }
159
+ } . call ( this ) ;
158
160
}
159
161
160
162
async function * asIndexedPairs ( options ) {
@@ -214,7 +216,7 @@ async function forEach(fn, options) {
214
216
for await ( const unused of this . map ( forEachFn , options ) ) ;
215
217
}
216
218
217
- async function * filter ( fn , options ) {
219
+ function filter ( fn , options ) {
218
220
if ( typeof fn !== 'function' ) {
219
221
throw new ERR_INVALID_ARG_TYPE (
220
222
'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -225,7 +227,7 @@ async function * filter(fn, options) {
225
227
}
226
228
return kEmpty ;
227
229
}
228
- yield * this . map ( filterFn , options ) ;
230
+ return this . map ( filterFn , options ) ;
229
231
}
230
232
231
233
async function toArray ( options ) {
@@ -239,10 +241,13 @@ async function toArray(options) {
239
241
return result ;
240
242
}
241
243
242
- async function * flatMap ( fn , options ) {
243
- for await ( const val of this . map ( fn , options ) ) {
244
- yield * val ;
245
- }
244
+ function flatMap ( fn , options ) {
245
+ const values = this . map ( fn , options ) ;
246
+ return async function * flatMap ( ) {
247
+ for await ( const val of values ) {
248
+ yield * val ;
249
+ }
250
+ } . call ( this ) ;
246
251
}
247
252
248
253
function toIntegerOrInfinity ( number ) {
0 commit comments