Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 23 additions & 22 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ const {
const { validateObject } = require('internal/validators');

const kPaused = Symbol('kPaused');
const kState = Symbol('kState');

const { StringDecoder } = require('string_decoder');
const from = require('internal/streams/from');
Expand Down Expand Up @@ -107,10 +108,10 @@ const kDataEmitted = 1 << 18;
function makeBitMapDescriptor(bit) {
return {
enumerable: false,
get() { return (this.state & bit) !== 0; },
get() { return (this[kState] & bit) !== 0; },
set(value) {
if (value) this.state |= bit;
else this.state &= ~bit;
if (value) this[kState] |= bit;
else this[kState] &= ~bit;
},
};
}
Expand Down Expand Up @@ -163,13 +164,13 @@ function ReadableState(options, stream, isDuplex) {

// Bit map field to store ReadableState more effciently with 1 bit per field
// instead of a V8 slot per field.
this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync;
// Object stream flag. Used to make read(n) ignore n and to
// make all the buffer merging and length checks go away.
if (options && options.objectMode) this.state |= kObjectMode;
if (options && options.objectMode) this[kState] |= kObjectMode;

if (isDuplex && options && options.readableObjectMode)
this.state |= kObjectMode;
this[kState] |= kObjectMode;

// The point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
Expand All @@ -188,10 +189,10 @@ function ReadableState(options, stream, isDuplex) {
this[kPaused] = null;

// Should close be emitted on destroy. Defaults to true.
if (options && options.emitClose === false) this.state &= ~kEmitClose;
if (options && options.emitClose === false) this[kState] &= ~kEmitClose;

// Should .destroy() be called after 'end' (and potentially 'finish').
if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;
if (options && options.autoDestroy === false) this[kState] &= ~kAutoDestroy;


// Indicates whether the stream has errored. When true no further
Expand Down Expand Up @@ -296,7 +297,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream._readableState;

let err;
if ((state.state & kObjectMode) === 0) {
if ((state[kState] & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (state.encoding !== encoding) {
Expand All @@ -323,11 +324,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if (err) {
errorOrDestroy(stream, err);
} else if (chunk === null) {
state.state &= ~kReading;
state[kState] &= ~kReading;
onEofChunk(stream, state);
} else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
} else if (((state[kState] & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
if (addToFront) {
if ((state.state & kEndEmitted) !== 0)
if ((state[kState] & kEndEmitted) !== 0)
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
else if (state.destroyed || state.errored)
return false;
Expand All @@ -338,7 +339,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.state &= ~kReading;
state[kState] &= ~kReading;
if (state.decoder && !encoding) {
chunk = state.decoder.write(chunk);
if (state.objectMode || chunk.length !== 0)
Expand All @@ -350,7 +351,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
}
}
} else if (!addToFront) {
state.state &= ~kReading;
state[kState] &= ~kReading;
maybeReadMore(stream, state);
}

Expand All @@ -366,7 +367,7 @@ function addChunk(stream, state, chunk, addToFront) {
stream.listenerCount('data') > 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if ((state.state & kMultiAwaitDrain) !== 0) {
if ((state[kState] & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand All @@ -382,7 +383,7 @@ function addChunk(stream, state, chunk, addToFront) {
else
state.buffer.push(chunk);

if ((state.state & kNeedReadable) !== 0)
if ((state[kState] & kNeedReadable) !== 0)
emitReadable(stream);
}
maybeReadMore(stream, state);
Expand Down Expand Up @@ -437,7 +438,7 @@ function computeNewHighWaterMark(n) {
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if ((state.state & kObjectMode) !== 0)
if ((state[kState] & kObjectMode) !== 0)
return 1;
if (NumberIsNaN(n)) {
// Only flow one buffer at a time.
Expand Down Expand Up @@ -468,7 +469,7 @@ Readable.prototype.read = function(n) {
state.highWaterMark = computeNewHighWaterMark(n);

if (n !== 0)
state.state &= ~kEmittedReadable;
state[kState] &= ~kEmittedReadable;

// If we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
Expand Down Expand Up @@ -519,7 +520,7 @@ Readable.prototype.read = function(n) {
// 3. Actually pull the requested chunks out of the buffer and return.

// if we need a readable event, then we need to do some reading.
let doRead = (state.state & kNeedReadable) !== 0;
let doRead = (state[kState] & kNeedReadable) !== 0;
debug('need readable', doRead);

// If we currently have less than the highWaterMark, then also read some.
Expand All @@ -537,18 +538,18 @@ Readable.prototype.read = function(n) {
debug('reading, ended or constructing', doRead);
} else if (doRead) {
debug('do read');
state.state |= kReading | kSync;
state[kState] |= kReading | kSync;
// If the length is currently zero, then we *need* a readable event.
if (state.length === 0)
state.state |= kNeedReadable;
state[kState] |= kNeedReadable;

// Call internal read method
try {
this._read(state.highWaterMark);
} catch (err) {
errorOrDestroy(this, err);
}
state.state &= ~kSync;
state[kState] &= ~kSync;

// If _read pushed data synchronously, then `reading` will be false,
// and we need to re-evaluate how much data we can return to the user.
Expand Down
Loading