Skip to content

Throwing a synchronous error in a custom writable stream implementation doesn't immediately stop execution #41846

@ceeser

Description

@ceeser

Version

v14.18.2

Platform

MacOs Monterey - Darwin 21.3.0 Darwin Kernel Version 21.3.0 root:xnu-8019.80.24~20/RELEASE_X86_64 x86_64

Subsystem

Stream Writable

What steps will reproduce the bug?

When piping in data to a custom writable stream, throw synchronously won't immediately block the stream. The buffer continues processing till the highWaterMark is hit, at which point the error is handled. I believe this is due to onWrite using process.nextTick to handle the errors. It was introduced in this PR.

The following script will trigger the problem. On error should be triggered when the counter is 50, but it actually triggered with the counter is 64 (given than highWaterMark is set to it's default at 16 in objectMode):

const stream = require('stream');

const STREAM_ERR_VAL = 50;

const myWritable = function(fn) {
  return new stream.Writable({
    write(chunk, encoding, cb) {
      if (!fn) {
        return cb();
      }

      let res;
      try {
        const args = [chunk];
        // perform some async task here
        res = fn.apply(null, args);

      } catch (ex) {
        // Failed when executing synchronously:
        // const sync = this._writableState.sync;
        // if (sync) {
        //   this._writableState.sync = false;
        // }
        return cb(ex);
        // this._writableState.sync = sync;

      }

      const is_promise = typeof res.then === 'function' && typeof res.catch === 'function';

      if (is_promise) {
        res
          .then(() => { cb(); })
          .catch(e => { cb(e); });
      } else {
        cb();
      }

    },
    objectMode: true,
  });
};

function noop() {
  return new stream.PassThrough({ objectMode: true, autoDestroy: true });
}


// start a stream that sends numbers consecutively.
function _numStream() {
  const s = noop();
  for (let i=0; i<100; i++) {
    console.log(`Pipe: ${i}`);
    s.write(i);
  }
  s.end();
  return s;
}

function sink(fn) {
  return new myWritable(fn);
};

function runStreamTest () {
  let counter = 0;
  _numStream()
    .pipe(sink(function(val) {
      // do something with val;
      counter++
      // throw in the middle of the stream.
      if (counter === STREAM_ERR_VAL) { throw new Error("TEST ERROR"); }
      // return some promise that needs to be reolved before the stream can continue
      // processing chunks
      return Promise.resolve();
    }))
    .on('error', function() {
      if (counter === STREAM_ERR_VAL) {
        console.log("Handled correctly! Error thrown at " + counter);
      } else {
        console.log("Failed to handle correctly. Error thrown at:  " + counter);
      }
    })
    .on('finish', function() {
      throw new Error("Error not handled correctly");
    });
}

runStreamTest();

How often does it reproduce? Is there a required condition?

It's reproduced 100% of the time.

What is the expected behavior?

The script should output:

Handled correctly! Error thrown at 50 

What do you see instead?

The script outputs:

Failed to handle correctly. Error thrown at:  64 

Additional information

I would have expect this code to run without errors, since the callback is being run in the then arm and the catch arm of the promise. However it seems that invoking onWriteError at nextTick is allowing subsequent buffered chunks to be processed before onWriteError is called when the error is thrown.

If, however, onWriteError is immediately called (not on next tick), the script executes successfully (stopping execution when counter == 50, as expected.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions