-
-
Notifications
You must be signed in to change notification settings - Fork 33.5k
Closed
Labels
streamIssues and PRs related to the stream subsystem.Issues and PRs related to the stream subsystem.
Description
Some thoughts on how to make Transform
streams faster.
Part of the overhead of Transform
(and PassThrough
) is that it is actually 2 streams, one Writable
and one Readable
, both with buffering and state management, which are connected together.
We could try to skip this and implement Transform
as a Readable
which implements the Writable
interface and proxies the naming.
e.g.
class FastTransform extends Readable {
constructor(options) {
super(options)
this._writableState = {
length: 0,
needDrain: false,
ended: false,
finished: false
}
}
get writableEnded () {
return this._writableState.ended
}
get writableFinished () {
return this._writableState.finished
}
_read () {
const rState = this._readableState
const wState = this._writableState
if (!wState.needDrain) {
return
}
if (wState.length + rState.length > rState.highWaterMark) {
return
}
wState.needDrain = false
this.emit('drain')
}
write (chunk) {
const rState = this._readableState
const wState = this._writableState
const len = chunk.length
wState.length += len
this._transform(chunk, null, (err, data) => {
wState.length -= len
if (err) {
this.destroy(err)
} else if (data != null) {
this.push(data)
}
this._read()
})
wState.needDrain = wState.length + rState.length > rState.highWaterMark
return wState.needDrain
}
end () {
const wState = this._writableState
wState.ended = true
if (this._flush) {
this._flush(chunk, (err, data) => {
const wState = this._writableState
if (err) {
this.destroy(err)
} else {
if (data != null) {
this.push(data)
}
this.push(null)
wState.finished = true
this.emit('finish')
}
})
} else {
this.push(null)
wState.finished = true
this.emit('finish')
}
}
}
// TODO: Make Writable[Symbol.hasInstance] recognize `FastTransform`.
Making this fully backwards compatible with Transform
might be difficult.
Metadata
Metadata
Assignees
Labels
streamIssues and PRs related to the stream subsystem.Issues and PRs related to the stream subsystem.