@@ -73,27 +73,35 @@ ObjectSetPrototypeOf(Writable, Stream);
7373function nop ( ) { }
7474
7575const kOnFinished = Symbol ( 'kOnFinished' ) ;
76+ const kErrored = Symbol ( 'kErrored' ) ;
77+ const kCorkedValue = Symbol ( 'kCorked' ) ;
78+
79+ const kCorked = 0b111111 ; // 6 bits
80+ const kObjectMode = 1 << 7 ;
81+ const kEnded = 1 << 8 ;
82+ const kConstructed = 1 << 9 ;
83+ const kSync = 1 << 10 ;
84+ const kErrorEmitted = 1 << 11 ;
85+ const kEmitClose = 1 << 12 ;
86+ const kAutoDestroy = 1 << 13 ;
87+ const kDestroyed = 1 << 14 ;
88+ const kClosed = 1 << 15 ;
89+ const kCloseEmitted = 1 << 16 ;
90+ const kFinalCalled = 1 << 17 ;
91+ const kNeedDrain = 1 << 18 ;
92+ const kEnding = 1 << 19 ;
93+ const kFinished = 1 << 20 ;
94+ const kDecodeStrings = 1 << 21 ;
95+ const kWriting = 1 << 22 ;
96+ const kBufferProcessing = 1 << 23 ;
97+ const kPrefinished = 1 << 24 ;
98+ const kAllBuffers = 1 << 25 ;
99+ const kAllNoop = 1 << 26 ;
100+ const kHasOnFinished = 1 << 27 ;
101+ const kHasErrored = 1 << 28 ;
102+ const kHasWritable = 1 << 29 ;
103+ const kWritable = 1 << 30 ;
76104
77- const kObjectMode = 1 << 0 ;
78- const kEnded = 1 << 1 ;
79- const kConstructed = 1 << 2 ;
80- const kSync = 1 << 3 ;
81- const kErrorEmitted = 1 << 4 ;
82- const kEmitClose = 1 << 5 ;
83- const kAutoDestroy = 1 << 6 ;
84- const kDestroyed = 1 << 7 ;
85- const kClosed = 1 << 8 ;
86- const kCloseEmitted = 1 << 9 ;
87- const kFinalCalled = 1 << 10 ;
88- const kNeedDrain = 1 << 11 ;
89- const kEnding = 1 << 12 ;
90- const kFinished = 1 << 13 ;
91- const kDecodeStrings = 1 << 14 ;
92- const kWriting = 1 << 15 ;
93- const kBufferProcessing = 1 << 16 ;
94- const kPrefinished = 1 << 17 ;
95- const kAllBuffers = 1 << 18 ;
96- const kAllNoop = 1 << 19 ;
97105
98106// TODO(benjamingr) it is likely slower to do it this way than with free functions
99107function makeBitMapDescriptor ( bit ) {
@@ -176,6 +184,58 @@ ObjectDefineProperties(WritableState.prototype, {
176184
177185 allBuffers : makeBitMapDescriptor ( kAllBuffers ) ,
178186 allNoop : makeBitMapDescriptor ( kAllNoop ) ,
187+
188+ // Indicates whether the stream has errored. When true all write() calls
189+ // should return false. This is needed since when autoDestroy
190+ // is disabled we need a way to tell whether the stream has failed.
191+ // This is/should be a cold path.
192+ errored : {
193+ enumerable : false ,
194+ get ( ) { return ( this . state & kHasErrored ) !== 0 ? this [ kErrored ] : null ; } ,
195+ set ( value ) {
196+ if ( value ) {
197+ this [ kErrored ] = value ;
198+ this . state |= kHasErrored ;
199+ } else {
200+ this . state &= ~ kHasErrored ;
201+ }
202+ } ,
203+ } ,
204+
205+
206+ writable : {
207+ enumerable : false ,
208+ get ( ) { return ( this . state & kHasWritable ) !== 0 ? ( this . state & kWritable ) !== 0 : null ; } ,
209+ set ( value ) {
210+ if ( value == null ) {
211+ this . state &= ( kHasWritable | kWritable ) ;
212+ } else if ( value ) {
213+ this . state |= ( kHasWritable | kWritable ) ;
214+ } else {
215+ this . state |= kHasWritable ;
216+ this . state &= ~ kWritable ;
217+ }
218+ } ,
219+ } ,
220+
221+ // When true all writes will be buffered until .uncork() call.
222+ // This is/should be a cold path.
223+ corked : {
224+ enumerable : false ,
225+ get ( ) {
226+ const corked = this . state & kCorked ;
227+ return corked !== kCorked ? val : this [ kCorkedValue ] ;
228+ } ,
229+ set ( value ) {
230+ if ( value < kCorked ) {
231+ this . state &= ~ kCorked ;
232+ this . state |= value ;
233+ } else {
234+ this . state |= kCorked
235+ this [ kCorkedValue ] = value ;
236+ }
237+ } ,
238+ } ,
179239} ) ;
180240
181241function WritableState ( options , stream , isDuplex ) {
@@ -226,9 +286,6 @@ function WritableState(options, stream, isDuplex) {
226286 // socket or file.
227287 this . length = 0 ;
228288
229- // When true all writes will be buffered until .uncork() call.
230- this . corked = 0 ;
231-
232289 // The callback that's passed to _write(chunk, cb).
233290 this . onwrite = onwrite . bind ( undefined , stream ) ;
234291
@@ -247,13 +304,6 @@ function WritableState(options, stream, isDuplex) {
247304 // Number of pending user-supplied write callbacks
248305 // this must be 0 before 'finish' can be emitted.
249306 this . pendingcb = 0 ;
250-
251- // Indicates whether the stream has errored. When true all write() calls
252- // should return false. This is needed since when autoDestroy
253- // is disabled we need a way to tell whether the stream has failed.
254- this . errored = null ;
255-
256- this [ kOnFinished ] = [ ] ;
257307}
258308
259309function resetBuffer ( state ) {
@@ -394,17 +444,32 @@ Writable.prototype.write = function(chunk, encoding, cb) {
394444} ;
395445
396446Writable . prototype . cork = function ( ) {
397- this . _writableState . corked ++ ;
447+ const state = this . _writableState ;
448+
449+ const corked = ( state & kCorked ) + 1 ;
450+ if ( corked < kCorked ) {
451+ this . _writableState . state += 1 ;
452+ } else {
453+ state . corked ++ ;
454+ }
398455} ;
399456
400457Writable . prototype . uncork = function ( ) {
401458 const state = this . _writableState ;
402459
403- if ( state . corked ) {
460+ if ( ( state . state & kCorked ) === 0 ) {
461+ return
462+ }
463+
464+ const corked = state & kCorked ;
465+ if ( corked < kCorked ) {
466+ this . _writableState . state -= 1 ;
467+ } else {
404468 state . corked -- ;
469+ }
405470
406- if ( ( state . state & kWriting ) === 0 )
407- clearBuffer ( this , state ) ;
471+ if ( ( state . state & kWriting ) === 0 ) {
472+ clearBuffer ( this , state ) ;
408473 }
409474} ;
410475
@@ -432,7 +497,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
432497 if ( ! ret )
433498 state . state |= kNeedDrain ;
434499
435- if ( ( state . state & kWriting ) !== 0 || state . corked || state . errored || ( state . state & kConstructed ) === 0 ) {
500+ if ( ( state . state & kWriting ) !== 0 || ( state . state & ( kHasErrored | kCorked ) ) || ( state . state & kConstructed ) === 0 ) {
436501 state . buffered . push ( { chunk, encoding, callback } ) ;
437502 if ( ( state . state & kAllBuffers ) !== 0 && encoding !== 'buffer' ) {
438503 state . state &= ~ kAllBuffers ;
@@ -450,7 +515,7 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
450515
451516 // Return false if errored or destroyed in order to break
452517 // any synchronous while(stream.write(data)) loops.
453- return ret && ! state . errored && ( state . state & kDestroyed ) === 0 ;
518+ return ret && ( state . state & kHasErrored ) === 0 && ( state . state & kDestroyed ) === 0 ;
454519}
455520
456521function doWrite ( stream , state , writev , len , chunk , encoding , cb ) {
@@ -498,7 +563,7 @@ function onwrite(stream, er) {
498563 // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
499564 er . stack ; // eslint-disable-line no-unused-expressions
500565
501- if ( ! state . errored ) {
566+ if ( ( state . state & kHasErrored ) === 0 ) {
502567 state . errored = er ;
503568 }
504569
@@ -573,18 +638,19 @@ function errorBuffer(state) {
573638 callback ( state . errored ?? new ERR_STREAM_DESTROYED ( 'write' ) ) ;
574639 }
575640
576- const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
577- for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
578- onfinishCallbacks [ i ] ( state . errored ?? new ERR_STREAM_DESTROYED ( 'end' ) ) ;
641+ if ( ( state . state & kHasOnFinished ) !== 0 ) {
642+ const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
643+ for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
644+ onfinishCallbacks [ i ] ( state . errored ?? new ERR_STREAM_DESTROYED ( 'end' ) ) ;
645+ }
579646 }
580647
581648 resetBuffer ( state ) ;
582649}
583650
584651// If there's something in the buffer waiting, then process it.
585652function clearBuffer ( stream , state ) {
586- if ( state . corked ||
587- ( state . state & ( kDestroyed | kBufferProcessing ) ) !== 0 ||
653+ if ( ( state . state & ( kDestroyed | kBufferProcessing | kCorked ) ) !== 0 ||
588654 ( state . state & kConstructed ) === 0 ) {
589655 return ;
590656 }
@@ -669,14 +735,16 @@ Writable.prototype.end = function(chunk, encoding, cb) {
669735 }
670736
671737 // .end() fully uncorks.
672- if ( state . corked ) {
673- state . corked = 1 ;
674- this . uncork ( ) ;
738+ if ( ( state . state & kCorked ) !== 0 ) {
739+ state . state &= ~ kCorked ;
740+ if ( ( state . state & kWriting ) === 0 ) {
741+ clearBuffer ( this , state ) ;
742+ }
675743 }
676744
677745 if ( err ) {
678746 // Do nothing...
679- } else if ( ! state . errored && ( state . state & kEnding ) === 0 ) {
747+ } else if ( ( state . state & kErrored ) === 0 && ( state . state & kEnding ) === 0 ) {
680748 // This is forgiving in terms of unnecessary calls to end() and can hide
681749 // logic errors. However, usually such errors are harmless and causing a
682750 // hard error can be disproportionately destructive. It is not always
@@ -698,6 +766,8 @@ Writable.prototype.end = function(chunk, encoding, cb) {
698766 } else if ( ( state . state & kFinished ) !== 0 ) {
699767 process . nextTick ( cb , null ) ;
700768 } else {
769+ state . state |= kHasOnFinished ;
770+ state [ kOnFinished ] ??= [ ] ;
701771 state [ kOnFinished ] . push ( cb ) ;
702772 }
703773 }
@@ -715,10 +785,10 @@ function needFinish(state) {
715785 kFinished |
716786 kWriting |
717787 kErrorEmitted |
718- kCloseEmitted
788+ kCloseEmitted |
789+ kHasErrored
719790 ) ) === ( kEnding | kConstructed ) &&
720791 state . length === 0 &&
721- ! state . errored &&
722792 state . buffered . length === 0 ) ;
723793}
724794
@@ -734,9 +804,11 @@ function callFinal(stream, state) {
734804
735805 state . pendingcb -- ;
736806 if ( err ) {
737- const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
738- for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
739- onfinishCallbacks [ i ] ( err ) ;
807+ if ( ( state . state & kHasOnFinished ) !== 0 ) {
808+ const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
809+ for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
810+ onfinishCallbacks [ i ] ( err ) ;
811+ }
740812 }
741813 errorOrDestroy ( stream , err , ( state . state & kSync ) !== 0 ) ;
742814 } else if ( needFinish ( state ) ) {
@@ -799,9 +871,11 @@ function finish(stream, state) {
799871 state . pendingcb -- ;
800872 state . state |= kFinished ;
801873
802- const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
803- for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
804- onfinishCallbacks [ i ] ( null ) ;
874+ if ( ( state . state & kHasOnFinished ) !== 0 ) {
875+ const onfinishCallbacks = state [ kOnFinished ] . splice ( 0 ) ;
876+ for ( let i = 0 ; i < onfinishCallbacks . length ; i ++ ) {
877+ onfinishCallbacks [ i ] ( null ) ;
878+ }
805879 }
806880
807881 stream . emit ( 'finish' ) ;
@@ -853,8 +927,8 @@ ObjectDefineProperties(Writable.prototype, {
853927 // where the writable side was disabled upon construction.
854928 // Compat. The user might manually disable writable side through
855929 // deprecated setter.
856- return ! ! w && w . writable !== false && ! w . errored &&
857- ( w . state & ( kEnding | kEnded | kDestroyed ) ) === 0 ;
930+ return ! ! w && w . writable !== false &&
931+ ( w . state & ( kEnding | kEnded | kDestroyed | kHasErrored ) ) === 0 ;
858932 } ,
859933 set ( val ) {
860934 // Backwards compatible.
@@ -928,7 +1002,7 @@ ObjectDefineProperties(Writable.prototype, {
9281002 __proto__ : null ,
9291003 enumerable : false ,
9301004 get ( ) {
931- return this . _writableState ? this . _writableState . errored : null ;
1005+ return this . _writableState && ( this . _writableState . state & kHasErrored ) !== 0 ? this . _writableState . errored : null ;
9321006 } ,
9331007 } ,
9341008
@@ -938,7 +1012,7 @@ ObjectDefineProperties(Writable.prototype, {
9381012 get : function ( ) {
9391013 return ! ! (
9401014 this . _writableState . writable !== false &&
941- ( ( this . _writableState . state & kDestroyed ) !== 0 || this . _writableState . errored ) &&
1015+ ( this . _writableState . state & ( kDestroyed | kHasErrored ) ) !== 0 &&
9421016 ( this . _writableState . state & kFinished ) === 0
9431017 ) ;
9441018 } ,
@@ -952,7 +1026,7 @@ Writable.prototype.destroy = function(err, cb) {
9521026 // Invoke pending callbacks.
9531027 if ( ( state . state & kDestroyed ) === 0 &&
9541028 ( state . bufferedIndex < state . buffered . length ||
955- state [ kOnFinished ] . length ) ) {
1029+ ( ( ( state . state & kHasOnFinished ) !== 0 ) && state [ kOnFinished ] . length ) ) ) {
9561030 process . nextTick ( errorBuffer , state ) ;
9571031 }
9581032
0 commit comments