@@ -33,52 +33,26 @@ const {
3333 isIterable,
3434 isReadableNodeStream,
3535 isNodeStream,
36- isReadableFinished,
3736} = require ( 'internal/streams/utils' ) ;
3837const { AbortController } = require ( 'internal/abort_controller' ) ;
3938
4039let PassThrough ;
4140let Readable ;
4241
43- function destroyer ( stream , reading , writing , callback ) {
44- callback = once ( callback ) ;
45-
42+ function destroyer ( stream , reading , writing ) {
4643 let finished = false ;
4744 stream . on ( 'close' , ( ) => {
4845 finished = true ;
4946 } ) ;
5047
5148 eos ( stream , { readable : reading , writable : writing } , ( err ) => {
5249 finished = ! err ;
53-
54- const rState = stream . _readableState ;
55- if (
56- err &&
57- err . code === 'ERR_STREAM_PREMATURE_CLOSE' &&
58- reading &&
59- ( rState && rState . ended && ! rState . errored && ! rState . errorEmitted )
60- ) {
61- // Some readable streams will emit 'close' before 'end'. However, since
62- // this is on the readable side 'end' should still be emitted if the
63- // stream has been ended and no error emitted. This should be allowed in
64- // favor of backwards compatibility. Since the stream is piped to a
65- // destination this should not result in any observable difference.
66- // We don't need to check if this is a writable premature close since
67- // eos will only fail with premature close on the reading side for
68- // duplex streams.
69- stream
70- . once ( 'end' , callback )
71- . once ( 'error' , callback ) ;
72- } else {
73- callback ( err ) ;
74- }
7550 } ) ;
7651
7752 return ( err ) => {
7853 if ( finished ) return ;
7954 finished = true ;
80- destroyImpl . destroyer ( stream , err ) ;
81- callback ( err || new ERR_STREAM_DESTROYED ( 'pipe' ) ) ;
55+ destroyImpl . destroyer ( stream , err || new ERR_STREAM_DESTROYED ( 'pipe' ) ) ;
8256 } ;
8357}
8458
@@ -109,7 +83,7 @@ async function* fromReadable(val) {
10983 yield * Readable . prototype [ SymbolAsyncIterator ] . call ( val ) ;
11084}
11185
112- async function pump ( iterable , writable , finish , opts ) {
86+ async function pump ( iterable , writable , finish , { end } ) {
11387 let error ;
11488 let onresolve = null ;
11589
@@ -153,7 +127,7 @@ async function pump(iterable, writable, finish, opts) {
153127 }
154128 }
155129
156- if ( opts ?. end !== false ) {
130+ if ( end ) {
157131 writable . end ( ) ;
158132 }
159133
@@ -220,7 +194,7 @@ function pipelineImpl(streams, callback, opts) {
220194 ac . abort ( ) ;
221195
222196 if ( final ) {
223- callback ( error , value ) ;
197+ process . nextTick ( callback , error , value ) ;
224198 }
225199 }
226200
@@ -233,18 +207,19 @@ function pipelineImpl(streams, callback, opts) {
233207
234208 if ( isNodeStream ( stream ) ) {
235209 if ( end ) {
236- finishCount ++ ;
237- destroys . push ( destroyer ( stream , reading , writing , ( err ) => {
238- if ( ! err && ! reading && isReadableFinished ( stream , false ) ) {
239- stream . read ( 0 ) ;
240- destroyer ( stream , true , writing , finish ) ;
241- } else {
242- finish ( err ) ;
243- }
244- } ) ) ;
245- } else {
246- stream . on ( 'error' , finish ) ;
210+ destroys . push ( destroyer ( stream , reading , writing ) ) ;
247211 }
212+
213+ // Catch stream errors that occur after pipe/pump has completed.
214+ stream . on ( 'error' , ( err ) => {
215+ if (
216+ err &&
217+ err . name !== 'AbortError' &&
218+ err . code !== 'ERR_STREAM_PREMATURE_CLOSE'
219+ ) {
220+ finish ( err ) ;
221+ }
222+ } ) ;
248223 }
249224
250225 if ( i === 0 ) {
@@ -286,15 +261,18 @@ function pipelineImpl(streams, callback, opts) {
286261 // second use.
287262 const then = ret ?. then ;
288263 if ( typeof then === 'function' ) {
264+ finishCount ++ ;
289265 then . call ( ret ,
290266 ( val ) => {
291267 value = val ;
292268 pt . write ( val ) ;
293269 if ( end ) {
294270 pt . end ( ) ;
295271 }
272+ process . nextTick ( finish ) ;
296273 } , ( err ) => {
297274 pt . destroy ( err ) ;
275+ process . nextTick ( finish , err ) ;
298276 } ,
299277 ) ;
300278 } else if ( isIterable ( ret , true ) ) {
@@ -307,24 +285,18 @@ function pipelineImpl(streams, callback, opts) {
307285
308286 ret = pt ;
309287
310- finishCount ++ ;
311- destroys . push ( destroyer ( ret , false , true , finish ) ) ;
288+ destroys . push ( destroyer ( ret , false , true ) ) ;
312289 }
313290 } else if ( isNodeStream ( stream ) ) {
314291 if ( isReadableNodeStream ( ret ) ) {
315- ret . pipe ( stream , { end } ) ;
316-
317- // Compat. Before node v10.12.0 stdio used to throw an error so
318- // pipe() did/does not end() stdio destinations.
319- // Now they allow it but "secretly" don't close the underlying fd.
320- if ( stream === process . stdout || stream === process . stderr ) {
321- ret . on ( 'end' , ( ) => stream . end ( ) ) ;
322- }
323- } else {
324- ret = makeAsyncIterable ( ret ) ;
325-
292+ finishCount += 2 ;
293+ pipe ( ret , stream , finish , { end } ) ;
294+ } else if ( isIterable ( ret ) ) {
326295 finishCount ++ ;
327296 pump ( ret , stream , finish , { end } ) ;
297+ } else {
298+ throw new ERR_INVALID_ARG_TYPE (
299+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' ] , ret ) ;
328300 }
329301 ret = stream ;
330302 } else {
@@ -339,4 +311,41 @@ function pipelineImpl(streams, callback, opts) {
339311 return ret ;
340312}
341313
314+ function pipe ( src , dst , finish , { end } ) {
315+ src . pipe ( dst , { end } ) ;
316+
317+ if ( end ) {
318+ // Compat. Before node v10.12.0 stdio used to throw an error so
319+ // pipe() did/does not end() stdio destinations.
320+ // Now they allow it but "secretly" don't close the underlying fd.
321+ src . once ( 'end' , ( ) => dst . end ( ) ) ;
322+ } else {
323+ finish ( ) ;
324+ }
325+
326+ eos ( src , { readable : true , writable : false } , ( err ) => {
327+ const rState = src . _readableState ;
328+ if (
329+ err &&
330+ err . code === 'ERR_STREAM_PREMATURE_CLOSE' &&
331+ ( rState && rState . ended && ! rState . errored && ! rState . errorEmitted )
332+ ) {
333+ // Some readable streams will emit 'close' before 'end'. However, since
334+ // this is on the readable side 'end' should still be emitted if the
335+ // stream has been ended and no error emitted. This should be allowed in
336+ // favor of backwards compatibility. Since the stream is piped to a
337+ // destination this should not result in any observable difference.
338+ // We don't need to check if this is a writable premature close since
339+ // eos will only fail with premature close on the reading side for
340+ // duplex streams.
341+ src
342+ . once ( 'end' , finish )
343+ . once ( 'error' , finish ) ;
344+ } else {
345+ finish ( err ) ;
346+ }
347+ } ) ;
348+ eos ( dst , { readable : false , writable : true } , finish ) ;
349+ }
350+
342351module . exports = { pipelineImpl, pipeline } ;
0 commit comments