@@ -77,6 +77,19 @@ function eachWorker(cb) {
7777  } 
7878} 
7979
80+ // Extremely simple progress tracker 
81+ function  ProgressTracker ( missing ,  callback )  { 
82+   this . missing  =  missing ; 
83+   this . callback  =  callback ; 
84+ } 
85+ ProgressTracker . prototype . done  =  function ( )  { 
86+   this . missing  -=  1 ; 
87+   this . check ( ) ; 
88+ } ; 
89+ ProgressTracker . prototype . check  =  function ( )  { 
90+   if  ( this . missing  ===  0 )  this . callback ( ) ; 
91+ } ; 
92+ 
8093cluster . setupMaster  =  function ( options )  { 
8194  // This can only be called from the master. 
8295  assert ( cluster . isMaster ) ; 
@@ -238,7 +251,10 @@ if (cluster.isMaster) {
238251// Messages to a worker will be handled using this methods 
239252else  if  ( cluster . isWorker )  { 
240253
241-   // TODO: the disconnect step will use this 
254+   // Handle worker.disconnect from master 
255+   messageHandingObject . disconnect  =  function ( message ,  worker )  { 
256+     worker . disconnect ( ) ; 
257+   } ; 
242258} 
243259
244260function  toDecInt ( value )  { 
@@ -291,9 +307,11 @@ function Worker(customEnv) {
291307    } ) ; 
292308  } 
293309
294-   // handle internalMessage  and exit  event 
310+   // handle internalMessage, exit  and disconnect  event 
295311  this . process . on ( 'internalMessage' ,  handleMessage . bind ( null ,  this ) ) ; 
296312  this . process . on ( 'exit' ,  prepareDeath . bind ( null ,  this ,  'dead' ,  'death' ) ) ; 
313+   this . process . on ( 'disconnect' , 
314+                   prepareDeath . bind ( null ,  this ,  'disconnected' ,  'disconnect' ) ) ; 
297315
298316  // relay message and error 
299317  this . process . on ( 'message' ,  this . emit . bind ( this ,  'message' ) ) ; 
@@ -354,14 +372,6 @@ Worker.prototype.send = function() {
354372  this . process . send . apply ( this . process ,  arguments ) ; 
355373} ; 
356374
357- 
358- function  closeWorkerChannel ( worker ,  callback )  { 
359-   //Apparently the .close method is async, but do not have a callback 
360-   worker . process . _channel . close ( ) ; 
361-   worker . process . _channel  =  null ; 
362-   process . nextTick ( callback ) ; 
363- } 
364- 
365375// Kill the worker without restarting 
366376Worker . prototype . destroy  =  function ( )  { 
367377  var  self  =  this ; 
@@ -371,9 +381,14 @@ Worker.prototype.destroy = function() {
371381  if  ( cluster . isMaster )  { 
372382    // Disconnect IPC channel 
373383    // this way the worker won't need to propagate suicide state to master 
374-     closeWorkerChannel ( this ,  function ( )  { 
384+     if  ( self . process . connected )  { 
385+       self . process . once ( 'disconnect' ,  function ( )  { 
386+         self . process . kill ( ) ; 
387+       } ) ; 
388+       self . process . disconnect ( ) ; 
389+     }  else  { 
375390      self . process . kill ( ) ; 
376-     } ) ; 
391+     } 
377392
378393  }  else  { 
379394    // Channel is open 
@@ -401,6 +416,59 @@ Worker.prototype.destroy = function() {
401416  } 
402417} ; 
403418
419+ // The .disconnect function will close all server and then disconnect 
420+ // the IPC channel. 
421+ if  ( cluster . isMaster )  { 
422+   // Used in master 
423+   Worker . prototype . disconnect  =  function ( )  { 
424+     this . suicide  =  true ; 
425+ 
426+     sendInternalMessage ( this ,  { cmd : 'disconnect' } ) ; 
427+   } ; 
428+ 
429+ }  else  { 
430+   // Used in workers 
431+   Worker . prototype . disconnect  =  function ( )  { 
432+     var  self  =  this ; 
433+ 
434+     this . suicide  =  true ; 
435+ 
436+     // keep track of open servers 
437+     var  servers  =  Object . keys ( serverLisenters ) . length ; 
438+     var  progress  =  new  ProgressTracker ( servers ,  function ( )  { 
439+       // there are no more servers open so we will close the IPC channel. 
440+       // Closeing the IPC channel will emit emit a disconnect event 
441+       // in both master and worker on the process object. 
442+       // This event will be handled by prepearDeath. 
443+       self . process . disconnect ( ) ; 
444+     } ) ; 
445+ 
446+     // depending on where this function was called from (master or worker) 
447+     // the suicide state has allready been set. 
448+     // But it dosn't really matter if we set it again. 
449+     sendInternalMessage ( this ,  { cmd : 'suicide' } ,  function ( )  { 
450+       // in case there are no servers 
451+       progress . check ( ) ; 
452+ 
453+       // closeing all servers graceful 
454+       var  server ; 
455+       for  ( var  key  in  serverLisenters )  { 
456+         server  =  serverLisenters [ key ] ; 
457+ 
458+         // in case the server is closed we wont close it again 
459+         if  ( server . _handle  ===  null )  { 
460+           progress . done ( ) ; 
461+           continue ; 
462+         } 
463+ 
464+         server . on ( 'close' ,  progress . done . bind ( progress ) ) ; 
465+         server . close ( ) ; 
466+       } 
467+     } ) ; 
468+ 
469+   } ; 
470+ } 
471+ 
404472// Fork a new worker 
405473cluster . fork  =  function ( env )  { 
406474  // This can only be called from the master. 
@@ -412,6 +480,33 @@ cluster.fork = function(env) {
412480  return  ( new  cluster . Worker ( env ) ) ; 
413481} ; 
414482
483+ // execute .disconnect on all workers and close handlers when done 
484+ cluster . disconnect  =  function ( callback )  { 
485+   // This can only be called from the master. 
486+   assert ( cluster . isMaster ) ; 
487+ 
488+   // Close all TCP handlers when all workers are disconnected 
489+   var  workers  =  Object . keys ( cluster . workers ) . length ; 
490+   var  progress  =  new  ProgressTracker ( workers ,  function ( )  { 
491+     for  ( var  key  in  serverHandlers )  { 
492+       serverHandlers [ key ] . close ( ) ; 
493+       delete  serverHandlers [ key ] ; 
494+     } 
495+ 
496+     // call callback when done 
497+     if  ( callback )  callback ( ) ; 
498+   } ) ; 
499+ 
500+   // begin disconnecting all workers 
501+   eachWorker ( function ( worker )  { 
502+     worker . once ( 'disconnect' ,  progress . done . bind ( progress ) ) ; 
503+     worker . disconnect ( ) ; 
504+   } ) ; 
505+ 
506+   // in case there wasn't any workers 
507+   progress . check ( ) ; 
508+ } ; 
509+ 
415510// Sync way to quickly kill all cluster workers 
416511// However the workers may not die instantly 
417512function  quickDestroyCluster ( )  { 
0 commit comments