@@ -26,7 +26,7 @@ const {
2626const kEmpty = Symbol ( 'kEmpty' ) ;
2727const kEof = Symbol ( 'kEof' ) ;
2828
29- async function * map ( fn , options ) {
29+ function map ( fn , options ) {
3030 if ( typeof fn !== 'function' ) {
3131 throw new ERR_INVALID_ARG_TYPE (
3232 'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -43,118 +43,120 @@ async function * map(fn, options) {
4343
4444 validateInteger ( concurrency , 'concurrency' , 1 ) ;
4545
46- const ac = new AbortController ( ) ;
47- const stream = this ;
48- const queue = [ ] ;
49- const signal = ac . signal ;
50- const signalOpt = { signal } ;
51-
52- const abort = ( ) => ac . abort ( ) ;
53- if ( options ?. signal ?. aborted ) {
54- abort ( ) ;
55- }
56-
57- options ?. signal ?. addEventListener ( 'abort' , abort ) ;
58-
59- let next ;
60- let resume ;
61- let done = false ;
62-
63- function onDone ( ) {
64- done = true ;
65- }
46+ return async function * map ( ) {
47+ const ac = new AbortController ( ) ;
48+ const stream = this ;
49+ const queue = [ ] ;
50+ const signal = ac . signal ;
51+ const signalOpt = { signal } ;
6652
67- async function pump ( ) {
68- try {
69- for await ( let val of stream ) {
70- if ( done ) {
71- return ;
72- }
53+ const abort = ( ) => ac . abort ( ) ;
54+ if ( options ?. signal ?. aborted ) {
55+ abort ( ) ;
56+ }
7357
74- if ( signal . aborted ) {
75- throw new AbortError ( ) ;
76- }
58+ options ?. signal ?. addEventListener ( 'abort' , abort ) ;
7759
78- try {
79- val = fn ( val , signalOpt ) ;
80- } catch ( err ) {
81- val = PromiseReject ( err ) ;
82- }
60+ let next ;
61+ let resume ;
62+ let done = false ;
8363
84- if ( val === kEmpty ) {
85- continue ;
86- }
64+ function onDone ( ) {
65+ done = true ;
66+ }
8767
88- if ( typeof val ?. catch === 'function' ) {
89- val . catch ( onDone ) ;
68+ async function pump ( ) {
69+ try {
70+ for await ( let val of stream ) {
71+ if ( done ) {
72+ return ;
73+ }
74+
75+ if ( signal . aborted ) {
76+ throw new AbortError ( ) ;
77+ }
78+
79+ try {
80+ val = fn ( val , signalOpt ) ;
81+ } catch ( err ) {
82+ val = PromiseReject ( err ) ;
83+ }
84+
85+ if ( val === kEmpty ) {
86+ continue ;
87+ }
88+
89+ if ( typeof val ?. catch === 'function' ) {
90+ val . catch ( onDone ) ;
91+ }
92+
93+ queue . push ( val ) ;
94+ if ( next ) {
95+ next ( ) ;
96+ next = null ;
97+ }
98+
99+ if ( ! done && queue . length && queue . length >= concurrency ) {
100+ await new Promise ( ( resolve ) => {
101+ resume = resolve ;
102+ } ) ;
103+ }
90104 }
91-
105+ queue . push ( kEof ) ;
106+ } catch ( err ) {
107+ const val = PromiseReject ( err ) ;
108+ PromisePrototypeCatch ( val , onDone ) ;
92109 queue . push ( val ) ;
110+ } finally {
111+ done = true ;
93112 if ( next ) {
94113 next ( ) ;
95114 next = null ;
96115 }
97-
98- if ( ! done && queue . length && queue . length >= concurrency ) {
99- await new Promise ( ( resolve ) => {
100- resume = resolve ;
101- } ) ;
102- }
103- }
104- queue . push ( kEof ) ;
105- } catch ( err ) {
106- const val = PromiseReject ( err ) ;
107- PromisePrototypeCatch ( val , onDone ) ;
108- queue . push ( val ) ;
109- } finally {
110- done = true ;
111- if ( next ) {
112- next ( ) ;
113- next = null ;
116+ options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
114117 }
115- options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
116118 }
117- }
118-
119- pump ( ) ;
120-
121- try {
122- while ( true ) {
123- while ( queue . length > 0 ) {
124- const val = await queue [ 0 ] ;
125-
126- if ( val === kEof ) {
127- return ;
128- }
129119
130- if ( signal . aborted ) {
131- throw new AbortError ( ) ;
132- }
120+ pump ( ) ;
133121
134- if ( val !== kEmpty ) {
135- yield val ;
122+ try {
123+ while ( true ) {
124+ while ( queue . length > 0 ) {
125+ const val = await queue [ 0 ] ;
126+
127+ if ( val === kEof ) {
128+ return ;
129+ }
130+
131+ if ( signal . aborted ) {
132+ throw new AbortError ( ) ;
133+ }
134+
135+ if ( val !== kEmpty ) {
136+ yield val ;
137+ }
138+
139+ queue . shift ( ) ;
140+ if ( resume ) {
141+ resume ( ) ;
142+ resume = null ;
143+ }
136144 }
137145
138- queue . shift ( ) ;
139- if ( resume ) {
140- resume ( ) ;
141- resume = null ;
142- }
146+ await new Promise ( ( resolve ) => {
147+ next = resolve ;
148+ } ) ;
143149 }
150+ } finally {
151+ ac . abort ( ) ;
144152
145- await new Promise ( ( resolve ) => {
146- next = resolve ;
147- } ) ;
148- }
149- } finally {
150- ac . abort ( ) ;
151-
152- done = true ;
153- if ( resume ) {
154- resume ( ) ;
155- resume = null ;
153+ done = true ;
154+ if ( resume ) {
155+ resume ( ) ;
156+ resume = null ;
157+ }
156158 }
157- }
159+ } . call ( this ) ;
158160}
159161
160162async function * asIndexedPairs ( options ) {
@@ -214,7 +216,7 @@ async function forEach(fn, options) {
214216 for await ( const unused of this . map ( forEachFn , options ) ) ;
215217}
216218
217- async function * filter ( fn , options ) {
219+ function filter ( fn , options ) {
218220 if ( typeof fn !== 'function' ) {
219221 throw new ERR_INVALID_ARG_TYPE (
220222 'fn' , [ 'Function' , 'AsyncFunction' ] , fn ) ;
@@ -225,7 +227,7 @@ async function * filter(fn, options) {
225227 }
226228 return kEmpty ;
227229 }
228- yield * this . map ( filterFn , options ) ;
230+ return this . map ( filterFn , options ) ;
229231}
230232
231233async function toArray ( options ) {
@@ -239,10 +241,13 @@ async function toArray(options) {
239241 return result ;
240242}
241243
242- async function * flatMap ( fn , options ) {
243- for await ( const val of this . map ( fn , options ) ) {
244- yield * val ;
245- }
244+ function flatMap ( fn , options ) {
245+ const values = this . map ( fn , options ) ;
246+ return async function * flatMap ( ) {
247+ for await ( const val of values ) {
248+ yield * val ;
249+ }
250+ } . call ( this ) ;
246251}
247252
248253function toIntegerOrInfinity ( number ) {
0 commit comments