Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions benchmark/streams/transform-manytransforms.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const common = require('../common');
const Transform = require('stream').Transform;

const bench = common.createBenchmark(main, {
n: [2e6],
sync: ['yes', 'no'],
callback: ['yes', 'no']
});

function main({ n, sync, callback }) {
const b = Buffer.allocUnsafe(1024);
const s = new Transform();
sync = sync === 'yes';

const writecb = (cb) => {
if (sync)
cb();
else
process.nextTick(cb);
};

s._transform = (chunk, encoding, cb) => writecb(cb);

const cb = callback === 'yes' ? () => {} : null;

bench.start();

let k = 0;
function run() {
while (k++ < n && s.write(b, cb));
if (k >= n)
bench.end(n);
}
s.on('drain', run);
s.on('data', () => {});
run();
}
155 changes: 39 additions & 116 deletions lib/_stream_transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,66 +65,25 @@

const {
ObjectSetPrototypeOf,
Symbol
} = primordials;

module.exports = Transform;
const {
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_TRANSFORM_ALREADY_TRANSFORMING,
ERR_TRANSFORM_WITH_LENGTH_0
ERR_METHOD_NOT_IMPLEMENTED
} = require('internal/errors').codes;
const Duplex = require('_stream_duplex');
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);


function afterTransform(er, data) {
const ts = this._transformState;
ts.transforming = false;

const cb = ts.writecb;

if (cb === null) {
return this.emit('error', new ERR_MULTIPLE_CALLBACK());
}

ts.writechunk = null;
ts.writecb = null;

if (data != null) // Single equals check for both `null` and `undefined`
this.push(data);

cb(er);

const rs = this._readableState;
rs.reading = false;
if (rs.needReadable || rs.length < rs.highWaterMark) {
this._read(rs.highWaterMark);
}
}

const kResume = Symbol('kResume');

function Transform(options) {
if (!(this instanceof Transform))
return new Transform(options);

Duplex.call(this, options);

this._transformState = {
afterTransform: afterTransform.bind(this),
needTransform: false,
transforming: false,
writecb: null,
writechunk: null,
writeencoding: null
};

// We have implemented the _read method, and done the other things
// that Readable wants before the first _read call, so unset the
// sync guard flag.
this._readableState.sync = false;

if (options) {
if (typeof options.transform === 'function')
this._transform = options.transform;
Expand All @@ -133,89 +92,53 @@ function Transform(options) {
this._flush = options.flush;
}

// When the writable side finishes, then flush out anything remaining.
this.on('prefinish', prefinish);
this._readableState.sync = false;
this[kResume] = null;
}

function prefinish() {
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
this._flush((er, data) => {
done(this, er, data);
Transform.prototype._final = function(cb) {
if (typeof this._flush === 'function') {
this._flush((err) => {
if (err) {
cb(err);
} else {
this.push(null);
cb();
}
});
} else {
done(this, null, null);
}
}

Transform.prototype.push = function(chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};

// This is the part where you do stuff!
// override this function in implementation classes.
// 'chunk' is an input chunk.
//
// Call `push(newChunk)` to pass along transformed output
// to the readable side. You may call 'push' zero or more times.
//
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
Transform.prototype._transform = function(chunk, encoding, cb) {
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()'));
};

Transform.prototype._write = function(chunk, encoding, cb) {
const ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
var rs = this._readableState;
if (ts.needTransform ||
rs.needReadable ||
rs.length < rs.highWaterMark)
this._read(rs.highWaterMark);
this.push(null);
cb();
}
};

// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
Transform.prototype._read = function(n) {
const ts = this._transformState;

if (ts.writechunk !== null && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
// Mark that we need a transform, so that any data that comes in
// will get processed, now that we've asked for it.
ts.needTransform = true;
if (this[kResume]) {
const resume = this[kResume];
this[kResume] = null;
resume();
}
};


Transform.prototype._destroy = function(err, cb) {
Duplex.prototype._destroy.call(this, err, (err2) => {
cb(err2);
Transform.prototype._write = function(chunk, encoding, callback) {
this._transform(chunk, encoding, (err, data) => {
if (err) {
return callback(err);
}

if (data !== undefined) {
this.push(data);
}

const r = this._readableState;
if (r.length < r.highWaterMark || r.length === 0) {
callback();
} else {
this[kResume] = callback;
}
});
};


function done(stream, er, data) {
if (er)
return stream.emit('error', er);

if (data != null) // Single equals check for both `null` and `undefined`
stream.push(data);

// These two error cases are coherence checks that can likely not be tested.
if (stream._writableState.length)
throw new ERR_TRANSFORM_WITH_LENGTH_0();

if (stream._transformState.transforming)
throw new ERR_TRANSFORM_ALREADY_TRANSFORMING();
return stream.push(null);
}
Transform.prototype._transform = function(chunk, encoding, cb) {
cb(new ERR_METHOD_NOT_IMPLEMENTED('_transform()'));
};
13 changes: 8 additions & 5 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -665,24 +665,26 @@ function needFinish(state) {
}

function callFinal(stream, state) {
state.sync = true;
stream._final((err) => {
state.pendingcb--;
if (err) {
errorOrDestroy(stream, err);
errorOrDestroy(stream, err, state.sync);
} else {
state.prefinished = true;
stream.emit('prefinish');
finishMaybe(stream, state);
finishMaybe(stream, state, state.sync);
}
});
state.sync = false;
}

function prefinish(stream, state) {
if (!state.prefinished && !state.finalCalled) {
if (typeof stream._final === 'function' && !state.destroyed) {
state.pendingcb++;
state.finalCalled = true;
process.nextTick(callFinal, stream, state);
callFinal(stream, state);
} else {
state.prefinished = true;
stream.emit('prefinish');
Expand All @@ -691,10 +693,11 @@ function prefinish(stream, state) {
}

function finishMaybe(stream, state, sync) {
const need = needFinish(state);
let need = needFinish(state);
if (need) {
prefinish(stream, state);
if (state.pendingcb === 0) {
need = needFinish(state);
if (state.pendingcb === 0 && need) {
state.pendingcb++;
if (sync) {
process.nextTick(finish, stream, state);
Expand Down
5 changes: 0 additions & 5 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1297,12 +1297,7 @@ E('ERR_TLS_SNI_FROM_SERVER',
E('ERR_TRACE_EVENTS_CATEGORY_REQUIRED',
'At least one category is required', TypeError);
E('ERR_TRACE_EVENTS_UNAVAILABLE', 'Trace events are unavailable', Error);
E('ERR_TRANSFORM_ALREADY_TRANSFORMING',
'Calling transform done when still transforming', Error);

// This should probably be a `RangeError`.
E('ERR_TRANSFORM_WITH_LENGTH_0',
'Calling transform done when writableState.length != 0', Error);
E('ERR_TTY_INIT_FAILED', 'TTY initialization failed', SystemError);
E('ERR_UNCAUGHT_EXCEPTION_CAPTURE_ALREADY_SET',
'`process.setupUncaughtExceptionCapture()` was called while a capture ' +
Expand Down
2 changes: 1 addition & 1 deletion lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ ObjectSetPrototypeOf(WriteStream, Writable);
WriteStream.prototype._final = function(callback) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._final(callback);
process.nextTick(() => this._final(callback));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this maybe be a good reason to keep _final behind a nextTick, at least in a semver-patch PR? Or maybe, why does this need to be changed here but not in the other places where we use a similar pattern (e.g. net, http2)?

Copy link
Member Author

@ronag ronag Nov 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's one reason to keep _final behind a nextTick but there are other reasons which make it not a good idea. I don't think this becomes a relevant problem once #29656 is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but not in the other places where we use a similar pattern (e.g. net, http2)?

It should probably be changed in those places as well.

});
}

Expand Down
21 changes: 13 additions & 8 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -2004,16 +2004,21 @@ class Http2Stream extends Duplex {
_final(cb) {
const handle = this[kHandle];
if (this.pending) {
this.once('ready', () => this._final(cb));
this.once('ready', function() {
process.nextTick(() => this._final(cb));
});
} else if (handle !== undefined) {
debugStreamObj(this, '_final shutting down');
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
// TODO: Why does this need to be in nextTick?
Copy link
Member Author

@ronag ronag Dec 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@addaleax Maybe you have some insight here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when it isn’t? 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! I forgot about that small detail:

Path: parallel/test-http2-client-socket-destroy
Mismatched noop function calls. Expected exactly 1, actual 0.
    at Object.mustCall (/Users/ronagy/GitHub/nxtedition/node/test/common/index.js:324:10)
    at Http2Server.<anonymous> (/Users/ronagy/GitHub/nxtedition/node/test/parallel/test-http2-client-socket-destroy.js:19:29)
    at Http2Server.<anonymous> (/Users/ronagy/GitHub/nxtedition/node/test/common/index.js:358:15)
    at Http2Server.emit (events.js:304:20)
    at ServerHttp2Session.sessionOnStream (internal/http2/core.js:2739:19)
    at ServerHttp2Session.emit (events.js:304:20)
    at emit (internal/http2/core.js:285:8)
    at processTicksAndRejections (internal/process/task_queues.js:87:22)
Command: out/Release/node --expose-internals /Users/ronagy/GitHub/nxtedition/node/test/parallel/test-http2-client-socket-destroy.js

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like some form of timing issue.

process.nextTick(() => {
const req = new ShutdownWrap();
req.oncomplete = afterShutdown;
req.callback = cb;
req.handle = handle;
const err = handle.shutdown(req);
if (err === 1) // synchronous finish
return afterShutdown.call(req, 0);
});
} else {
cb();
}
Expand Down
9 changes: 7 additions & 2 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function undestroy() {
}
}

function errorOrDestroy(stream, err) {
function errorOrDestroy(stream, err, sync) {
// We have tests that rely on errors being emitted
// in the same tick, so changing this is semver major.
// For now when you opt-in to autoDestroy we allow
Expand All @@ -138,7 +138,12 @@ function errorOrDestroy(stream, err) {
if (r) {
r.errored = true;
}
emitErrorNT(stream, err);

if (sync) {
process.nextTick(emitErrorNT, stream, err);
} else {
emitErrorNT(stream, err);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,9 @@ Socket.prototype._final = function(cb) {
// If still connecting - defer handling `_final` until 'connect' will happen
if (this.pending) {
debug('_final: not yet connected');
return this.once('connect', () => this._final(cb));
return this.once('connect', function() {
process.nextTick(() => this._final(cb));
});
}

if (!this._handle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,17 @@ const _transform = common.mustCall((chunk, _, next) => {
next();
});

const _final = common.mustCall((next) => {
next();
});

const _flush = common.mustCall((next) => {
next();
});

const t2 = new Transform({
transform: _transform,
flush: _flush,
final: _final
flush: _flush
});

strictEqual(t2._transform, _transform);
strictEqual(t2._flush, _flush);
strictEqual(t2._final, _final);

t2.end(Buffer.from('blerg'));
t2.resume();
Loading