55
66const {
77 ArrayIsArray,
8+ SymbolAsyncIterator,
9+ SymbolIterator
810} = primordials ;
911
1012let eos ;
1113
1214const { once } = require ( 'internal/util' ) ;
1315const {
16+ ERR_INVALID_ARG_TYPE ,
17+ ERR_INVALID_RETURN_VALUE ,
1418 ERR_INVALID_CALLBACK ,
1519 ERR_MISSING_ARGS ,
1620 ERR_STREAM_DESTROYED
1721} = require ( 'internal/errors' ) . codes ;
1822
23+ let EE ;
24+ let PassThrough ;
25+ let createReadableStreamAsyncIterator ;
26+
1927function isRequest ( stream ) {
2028 return stream && stream . setHeader && typeof stream . abort === 'function' ;
2129}
2230
31+ function destroyStream ( stream , err ) {
32+ // request.destroy just do .end - .abort is what we want
33+ if ( isRequest ( stream ) ) return stream . abort ( ) ;
34+ if ( isRequest ( stream . req ) ) return stream . req . abort ( ) ;
35+ if ( typeof stream . destroy === 'function' ) return stream . destroy ( err ) ;
36+ if ( typeof stream . close === 'function' ) return stream . close ( ) ;
37+ }
38+
2339function destroyer ( stream , reading , writing , callback ) {
2440 callback = once ( callback ) ;
2541
@@ -41,19 +57,12 @@ function destroyer(stream, reading, writing, callback) {
4157 if ( destroyed ) return ;
4258 destroyed = true ;
4359
44- // request.destroy just do .end - .abort is what we want
45- if ( isRequest ( stream ) ) return stream . abort ( ) ;
46- if ( isRequest ( stream . req ) ) return stream . req . abort ( ) ;
47- if ( typeof stream . destroy === 'function' ) return stream . destroy ( err ) ;
60+ destroyStream ( stream , err ) ;
4861
4962 callback ( err || new ERR_STREAM_DESTROYED ( 'pipe' ) ) ;
5063 } ;
5164}
5265
53- function pipe ( from , to ) {
54- return from . pipe ( to ) ;
55- }
56-
5766function popCallback ( streams ) {
5867 // Streams should never be an empty array. It should always contain at least
5968 // a single stream. Therefore optimize for the average case instead of
@@ -63,8 +72,89 @@ function popCallback(streams) {
6372 return streams . pop ( ) ;
6473}
6574
75+ function isPromise ( obj ) {
76+ return ! ! ( obj && typeof obj . then === 'function' ) ;
77+ }
78+
79+ function isReadable ( obj ) {
80+ return ! ! ( obj && typeof obj . pipe === 'function' ) ;
81+ }
82+
83+ function isWritable ( obj ) {
84+ return ! ! ( obj && typeof obj . write === 'function' ) ;
85+ }
86+
87+ function isStream ( obj ) {
88+ return isReadable ( obj ) || isWritable ( obj ) ;
89+ }
90+
91+ function isIterable ( obj , isAsync ) {
92+ if ( ! obj ) return false ;
93+ if ( isAsync === true ) return typeof obj [ SymbolAsyncIterator ] === 'function' ;
94+ if ( isAsync === false ) return typeof obj [ SymbolIterator ] === 'function' ;
95+ return typeof obj [ SymbolAsyncIterator ] === 'function' ||
96+ typeof obj [ SymbolIterator ] === 'function' ;
97+ }
98+
99+ function makeAsyncIterable ( val ) {
100+ if ( isIterable ( val ) ) {
101+ return val ;
102+ } else if ( isReadable ( val ) ) {
103+ // Legacy streams are not Iterable.
104+ return _fromReadable ( val ) ;
105+ } else {
106+ throw new ERR_INVALID_ARG_TYPE (
107+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' ] , val ) ;
108+ }
109+ }
110+
111+ async function * _fromReadable ( val ) {
112+ if ( ! createReadableStreamAsyncIterator ) {
113+ createReadableStreamAsyncIterator =
114+ require ( 'internal/streams/async_iterator' ) ;
115+ }
116+
117+ try {
118+ if ( typeof val . read !== 'function' ) {
119+ // createReadableStreamAsyncIterator does not support
120+ // v1 streams. Convert it into a v2 stream.
121+
122+ if ( ! PassThrough ) {
123+ PassThrough = require ( '_stream_passthrough' ) ;
124+ }
125+
126+ const pt = new PassThrough ( ) ;
127+ val
128+ . on ( 'error' , ( err ) => pt . destroy ( err ) )
129+ . pipe ( pt ) ;
130+ yield * createReadableStreamAsyncIterator ( pt ) ;
131+ } else {
132+ yield * createReadableStreamAsyncIterator ( val ) ;
133+ }
134+ } finally {
135+ destroyStream ( val ) ;
136+ }
137+ }
138+
139+ async function pump ( iterable , writable , finish ) {
140+ if ( ! EE ) {
141+ EE = require ( 'events' ) ;
142+ }
143+ try {
144+ for await ( const chunk of iterable ) {
145+ if ( ! writable . write ( chunk ) ) {
146+ if ( writable . destroyed ) return ;
147+ await EE . once ( writable , 'drain' ) ;
148+ }
149+ }
150+ writable . end ( ) ;
151+ } catch ( err ) {
152+ finish ( err ) ;
153+ }
154+ }
155+
66156function pipeline ( ...streams ) {
67- const callback = popCallback ( streams ) ;
157+ const callback = once ( popCallback ( streams ) ) ;
68158
69159 if ( ArrayIsArray ( streams [ 0 ] ) ) streams = streams [ 0 ] ;
70160
@@ -73,25 +163,104 @@ function pipeline(...streams) {
73163 }
74164
75165 let error ;
76- const destroys = streams . map ( function ( stream , i ) {
166+ const destroys = [ ] ;
167+
168+ function finish ( err , val , final ) {
169+ if ( ! error && err ) {
170+ error = err ;
171+ }
172+
173+ if ( error || final ) {
174+ for ( const destroy of destroys ) {
175+ destroy ( error ) ;
176+ }
177+ }
178+
179+ if ( final ) {
180+ callback ( error , val ) ;
181+ }
182+ }
183+
184+ function wrap ( stream , reading , writing , final ) {
185+ destroys . push ( destroyer ( stream , reading , writing , ( err ) => {
186+ finish ( err , null , final ) ;
187+ } ) ) ;
188+ }
189+
190+ let ret ;
191+ for ( let i = 0 ; i < streams . length ; i ++ ) {
192+ const stream = streams [ i ] ;
77193 const reading = i < streams . length - 1 ;
78194 const writing = i > 0 ;
79- return destroyer ( stream , reading , writing , function ( err ) {
80- if ( ! error ) error = err ;
81- if ( err ) {
82- for ( const destroy of destroys ) {
83- destroy ( err ) ;
195+
196+ if ( isStream ( stream ) ) {
197+ wrap ( stream , reading , writing , ! reading ) ;
198+ }
199+
200+ if ( i === 0 ) {
201+ if ( typeof stream === 'function' ) {
202+ ret = stream ( ) ;
203+ if ( ! isIterable ( ret ) ) {
204+ throw new ERR_INVALID_RETURN_VALUE (
205+ 'Iterable, AsyncIterable or Stream' , 'source' , ret ) ;
84206 }
207+ } else if ( isIterable ( stream ) || isReadable ( stream ) ) {
208+ ret = stream ;
209+ } else {
210+ throw new ERR_INVALID_ARG_TYPE (
211+ 'source' , [ 'Stream' , 'Iterable' , 'AsyncIterable' , 'Function' ] ,
212+ stream ) ;
85213 }
86- if ( reading ) return ;
87- for ( const destroy of destroys ) {
88- destroy ( ) ;
214+ } else if ( typeof stream === 'function' ) {
215+ ret = makeAsyncIterable ( ret ) ;
216+ ret = stream ( ret ) ;
217+
218+ if ( reading ) {
219+ if ( ! isIterable ( ret , true ) ) {
220+ throw new ERR_INVALID_RETURN_VALUE (
221+ 'AsyncIterable' , `transform[${ i - 1 } ]` , ret ) ;
222+ }
223+ } else {
224+ if ( ! PassThrough ) {
225+ PassThrough = require ( '_stream_passthrough' ) ;
226+ }
227+
228+ const pt = new PassThrough ( ) ;
229+ if ( isPromise ( ret ) ) {
230+ ret
231+ . then ( ( val ) => {
232+ pt . end ( val ) ;
233+ finish ( null , val , true ) ;
234+ } )
235+ . catch ( ( err ) => {
236+ finish ( err , null , true ) ;
237+ } ) ;
238+ } else if ( isIterable ( ret , true ) ) {
239+ pump ( ret , pt , finish ) ;
240+ } else {
241+ throw new ERR_INVALID_RETURN_VALUE (
242+ 'AsyncIterable or Promise' , 'destination' , ret ) ;
243+ }
244+
245+ ret = pt ;
246+ wrap ( ret , true , false , true ) ;
89247 }
90- callback ( error ) ;
91- } ) ;
92- } ) ;
248+ } else if ( isStream ( stream ) ) {
249+ if ( isReadable ( ret ) ) {
250+ ret . pipe ( stream ) ;
251+ } else {
252+ ret = makeAsyncIterable ( ret ) ;
253+ pump ( ret , stream , finish ) ;
254+ }
255+ ret = stream ;
256+ } else {
257+ const name = reading ? `transform[${ i - 1 } ]` : 'destination' ;
258+ throw new ERR_INVALID_ARG_TYPE (
259+ name , [ 'Stream' , 'Function' ] , ret ) ;
260+ }
261+ }
93262
94- return streams . reduce ( pipe ) ;
263+ return ret ;
95264}
96265
97266module . exports = pipeline ;
0 commit comments