From b0a18300e1048aa6e263c0deaf8fd05cd946b563 Mon Sep 17 00:00:00 2001 From: Momtchil Momtchev Date: Sun, 22 Nov 2020 19:01:23 +0100 Subject: [PATCH] stream: fix regression on duplex end Backport Decide the return status of writeOrBuffer before calling stream.write which can reset state.length Refs: https://github.com/nodejs/node/pull/35941/ Fixes: https://github.com/nodejs/node/issues/35926 --- lib/internal/streams/writable.js | 12 +++---- .../test-stream-duplex-readable-end.js | 32 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) create mode 100644 test/parallel/test-stream-duplex-readable-end.js diff --git a/lib/internal/streams/writable.js b/lib/internal/streams/writable.js index 209fa4a25413af..fbc0532681b83d 100644 --- a/lib/internal/streams/writable.js +++ b/lib/internal/streams/writable.js @@ -336,6 +336,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.length += len; + // stream._write resets state.length + const ret = state.length < state.highWaterMark; + // We must ensure that previous needDrain will not be reset to false. + if (!ret) + state.needDrain = true; + if (state.writing || state.corked || state.errored) { state.buffered.push({ chunk, encoding, callback }); if (state.allBuffers && encoding !== 'buffer') { @@ -353,12 +359,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) { state.sync = false; } - const ret = state.length < state.highWaterMark; - - // We must ensure that previous needDrain will not be reset to false. - if (!ret) - state.needDrain = true; - // Return false if errored or destroyed in order to break // any synchronous while(stream.write(data)) loops. return ret && !state.errored && !state.destroyed; diff --git a/test/parallel/test-stream-duplex-readable-end.js b/test/parallel/test-stream-duplex-readable-end.js new file mode 100644 index 00000000000000..ca3ccf63c49474 --- /dev/null +++ b/test/parallel/test-stream-duplex-readable-end.js @@ -0,0 +1,32 @@ +'use strict'; +// https://github.com/nodejs/node/issues/35926 +require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +let loops = 5; + +const src = new stream.Readable({ + read() { + if (loops--) + this.push(Buffer.alloc(20000)); + } +}); + +const dst = new stream.Transform({ + transform(chunk, output, fn) { + this.push(null); + fn(); + } +}); + +src.pipe(dst); + +function parser_end() { + assert.ok(loops > 0); + dst.removeAllListeners(); +} + +dst.on('data', () => { }); +dst.on('end', parser_end); +dst.on('error', parser_end);