@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter {
746746 shutdown : false ,
747747 shuttingDown : false ,
748748 pendingAck : 0 ,
749- maxPendingAck : Math . max ( 1 , ( options . maxPendingAck | 0 ) || 10 )
749+ maxPendingAck : Math . max ( 1 , ( options . maxPendingAck | 0 ) || 10 ) ,
750+ writeQueueSize : 0
750751 } ;
751752
752753 this [ kType ] = type ;
@@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter {
10801081 }
10811082
10821083 _onTimeout ( ) {
1084+ // This checks whether a write is currently in progress and also whether
1085+ // that write is actually sending data across the write. The kHandle
1086+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
1087+ // happens, meaning that if a write is ongoing it should never equal the
1088+ // newly fetched, updated value.
1089+ if ( this [ kState ] . writeQueueSize > 0 ) {
1090+ const handle = this [ kHandle ] ;
1091+ const chunksSentSinceLastWrite = handle !== undefined ?
1092+ handle . chunksSentSinceLastWrite : null ;
1093+ if ( chunksSentSinceLastWrite !== null &&
1094+ chunksSentSinceLastWrite !== handle . updateChunksSent ( ) ) {
1095+ _unrefActive ( this ) ;
1096+ return ;
1097+ }
1098+ }
1099+
10831100 process . nextTick ( emit , this , 'timeout' ) ;
10841101 }
10851102}
@@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding) {
11991216 }
12001217}
12011218
1219+ function trackWriteState ( stream , bytes ) {
1220+ const session = stream [ kSession ] ;
1221+ stream [ kState ] . writeQueueSize += bytes ;
1222+ session [ kState ] . writeQueueSize += bytes ;
1223+ session [ kHandle ] . chunksSentSinceLastWrite = 0 ;
1224+ }
1225+
12021226function afterDoStreamWrite ( status , handle , req ) {
1203- _unrefActive ( handle [ kOwner ] ) ;
1227+ const session = handle [ kOwner ] ;
1228+ _unrefActive ( session ) ;
1229+
1230+ const state = session [ kState ] ;
1231+ const { bytes } = req ;
1232+ state . writeQueueSize -= bytes ;
1233+
1234+ const stream = state . streams . get ( req . stream ) ;
1235+ if ( stream !== undefined ) {
1236+ _unrefActive ( stream ) ;
1237+ stream [ kState ] . writeQueueSize -= bytes ;
1238+ }
1239+
12041240 if ( typeof req . callback === 'function' )
12051241 req . callback ( ) ;
12061242 this . handle = undefined ;
@@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex {
13121348 headersSent : false ,
13131349 headRequest : false ,
13141350 aborted : false ,
1315- closeHandler : onSessionClose . bind ( this )
1351+ closeHandler : onSessionClose . bind ( this ) ,
1352+ writeQueueSize : 0
13161353 } ;
13171354
13181355 this . once ( 'ready' , streamOnceReady ) ;
@@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex {
13591396 }
13601397
13611398 _onTimeout ( ) {
1399+ // This checks whether a write is currently in progress and also whether
1400+ // that write is actually sending data across the write. The kHandle
1401+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
1402+ // happens, meaning that if a write is ongoing it should never equal the
1403+ // newly fetched, updated value.
1404+ if ( this [ kState ] . writeQueueSize > 0 ) {
1405+ const handle = this [ kSession ] [ kHandle ] ;
1406+ const chunksSentSinceLastWrite = handle !== undefined ?
1407+ handle . chunksSentSinceLastWrite : null ;
1408+ if ( chunksSentSinceLastWrite !== null &&
1409+ chunksSentSinceLastWrite !== handle . updateChunksSent ( ) ) {
1410+ _unrefActive ( this ) ;
1411+ _unrefActive ( this [ kSession ] ) ;
1412+ return ;
1413+ }
1414+ }
1415+
13621416 process . nextTick ( emit , this , 'timeout' ) ;
13631417 }
13641418
@@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex {
13961450 this . once ( 'ready' , this . _write . bind ( this , data , encoding , cb ) ) ;
13971451 return ;
13981452 }
1399- _unrefActive ( this ) ;
14001453 if ( ! this [ kState ] . headersSent )
14011454 this [ kProceed ] ( ) ;
14021455 const session = this [ kSession ] ;
1456+ _unrefActive ( this ) ;
1457+ _unrefActive ( session ) ;
14031458 const handle = session [ kHandle ] ;
14041459 const req = new WriteWrap ( ) ;
14051460 req . stream = this [ kID ] ;
@@ -1410,18 +1465,19 @@ class Http2Stream extends Duplex {
14101465 const err = createWriteReq ( req , handle , data , encoding ) ;
14111466 if ( err )
14121467 throw util . _errnoException ( err , 'write' , req . error ) ;
1413- this . _bytesDispatched += req . bytes ;
1468+ trackWriteState ( this , req . bytes ) ;
14141469 }
14151470
14161471 _writev ( data , cb ) {
14171472 if ( this [ kID ] === undefined ) {
14181473 this . once ( 'ready' , this . _writev . bind ( this , data , cb ) ) ;
14191474 return ;
14201475 }
1421- _unrefActive ( this ) ;
14221476 if ( ! this [ kState ] . headersSent )
14231477 this [ kProceed ] ( ) ;
14241478 const session = this [ kSession ] ;
1479+ _unrefActive ( this ) ;
1480+ _unrefActive ( session ) ;
14251481 const handle = session [ kHandle ] ;
14261482 const req = new WriteWrap ( ) ;
14271483 req . stream = this [ kID ] ;
@@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex {
14381494 const err = handle . writev ( req , chunks ) ;
14391495 if ( err )
14401496 throw util . _errnoException ( err , 'write' , req . error ) ;
1497+ trackWriteState ( this , req . bytes ) ;
14411498 }
14421499
14431500 _read ( nread ) {
@@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex {
15311588 return ;
15321589 }
15331590
1591+ const state = this [ kState ] ;
1592+ session [ kState ] . writeQueueSize -= state . writeQueueSize ;
1593+ state . writeQueueSize = 0 ;
1594+
15341595 const server = session [ kServer ] ;
15351596 if ( server !== undefined && err ) {
15361597 server . emit ( 'streamError' , err , this ) ;
@@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
16251686 if ( ret < 0 ) {
16261687 err = new NghttpError ( ret ) ;
16271688 process . nextTick ( emit , this , 'error' , err ) ;
1689+ break ;
16281690 }
1691+ // exact length of the file doesn't matter here, since the
1692+ // stream is closing anyway — just use 1 to signify that
1693+ // a write does exist
1694+ trackWriteState ( this , 1 ) ;
16291695 }
16301696}
16311697
0 commit comments