@@ -15,6 +15,7 @@ import { createModuleLogger } from 'lib0/logging'
1515import toobusy from 'toobusy-js'
1616import { promiseWithResolvers } from './utils.js'
1717import { ClientClosedError } from 'redis'
18+ import { randomUUID } from 'crypto'
1819
1920const logSocketIO = createModuleLogger ( '@y/socket-io/server' )
2021const PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' ) || '3000' )
@@ -24,6 +25,7 @@ const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
2425const DEFAULT_CLEAR_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' ) || '30000' )
2526const WORKER_HEALTH_CHECK_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-worker-health-check-interval' ) || '5000' )
2627const NEVER_REJECT_CONNECTION = env . getConf ( 'y-socket-io-never-reject-connection' ) === 'true'
28+ const PERSIST_LEADER_HEARTBEAT_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-leader-heartbeat-interval' ) || '5000' )
2729
2830process . on ( 'SIGINT' , function ( ) {
2931 // calling .shutdown allows your process to exit normally
@@ -169,6 +171,21 @@ export class YSocketIO {
169171 * @private
170172 */
171173 persistWorkerHealthCheckTimeout = null
174+ /**
175+ * @type {NodeJS.Timeout | null }
176+ * @private
177+ */
178+ persistentHeartbeatTimeout = null
179+ /**
180+ * @type {Set<string> }
181+ * @private
182+ */
183+ persistentLeaderOf = new Set ( )
184+ /**
185+ * @type {string }
186+ * @private
187+ */
188+ serverId = randomUUID ( )
172189
173190 /**
174191 * YSocketIO constructor.
@@ -210,6 +227,8 @@ export class YSocketIO {
210227 this . registerPersistWorkerHealthCheck ( )
211228 }
212229
230+ this . registerPersistentLeaderHeartbeat ( )
231+
213232 this . nsp = this . io . of ( / ^ \/ y j s \| .* $ / )
214233
215234 this . nsp . use ( async ( socket , next ) => {
@@ -286,6 +305,7 @@ export class YSocketIO {
286305 this . subscriber ?. ensureSubId ( stream , doc . redisLastId )
287306 }
288307 this . startSynchronization ( socket , doc )
308+ await this . tryAcquirePersistentLeader ( namespace )
289309 } ) ( )
290310 } )
291311
@@ -394,6 +414,7 @@ export class YSocketIO {
394414 if ( nsp ?. sockets . size === 0 && stream ) {
395415 this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
396416 if ( this . namespaceDocMap . has ( ns ) ) this . debouncedPersist ( ns , true )
417+ this . persistentLeaderOf . delete ( ns )
397418 }
398419 logSocketIO ( `disconnecting socket in ${ ns } , ${ nsp ?. sockets . size || 0 } remaining` )
399420 }
@@ -540,10 +561,13 @@ export class YSocketIO {
540561 // are all synchronize operations
541562 this . debouncedPersistMap . delete ( namespace )
542563
564+ const isLeader = await this . tryAcquirePersistentLeader ( namespace )
565+ if ( ! isLeader ) return
566+
543567 try {
544568 assert ( this . client )
545569 const doc = this . namespaceDocMap . get ( namespace ) ?. ydoc
546- logSocketIO ( `trying to persist ${ namespace } ` )
570+ logSocketIO ( `trying to persist ${ namespace } in [SID: ${ this . serverId } ] ` )
547571 if ( ! doc ) return
548572 if ( this . persistWorker && this . workerReady ) {
549573 /** @type {ReturnType<typeof promiseWithResolvers<void>> } */
@@ -659,6 +683,9 @@ export class YSocketIO {
659683 if ( this . persistWorkerHealthCheckTimeout ) {
660684 clearInterval ( this . persistWorkerHealthCheckTimeout )
661685 }
686+ if ( this . persistentHeartbeatTimeout ) {
687+ clearTimeout ( this . persistentHeartbeatTimeout )
688+ }
662689 this . subscriber ?. destroy ( )
663690 return this . client ?. destroy ( )
664691 } catch ( e ) {
@@ -767,4 +794,78 @@ export class YSocketIO {
767794 }
768795 return health
769796 }
797+
798+ /**
799+ * @param {string } namespace
800+ */
801+ getLeaderKeyOf ( namespace ) {
802+ assert ( this . client )
803+ return `${ this . client . prefix } :persist-leader:${ namespace } `
804+ }
805+
806+ async registerPersistentLeaderHeartbeat ( ) {
807+ this . persistentHeartbeatTimeout = setTimeout ( async ( ) => {
808+ assert ( this . client )
809+ const redis = this . client . redis
810+
811+ try {
812+ /**
813+ * @type {Array<Promise<any>> }
814+ */
815+ const promises = [ ]
816+ for ( const namespace of this . persistentLeaderOf ) {
817+ const key = this . getLeaderKeyOf ( namespace )
818+ const curLeader = await redis . get ( key )
819+
820+ // remove orphaned if exist
821+ const aliveClients = this . namespaceMap . get ( namespace ) ?. sockets . size || 0
822+ if ( aliveClients === 0 ) {
823+ logSocketIO ( `clearing leader heartbeat for [${ namespace } ] (SID: ${ this . serverId } )` )
824+ this . persistentLeaderOf . delete ( namespace )
825+ continue
826+ }
827+
828+ if ( curLeader === this . serverId ) {
829+ logSocketIO ( `set leader heartbeat for [${ namespace } ] (SID: ${ this . serverId } )` )
830+ promises . push (
831+ redis . set ( key , this . serverId , {
832+ XX : true ,
833+ PX : PERSIST_LEADER_HEARTBEAT_INTERVAL
834+ } )
835+ )
836+ } else {
837+ logSocketIO ( `lost leadership for [${ namespace } ] (SID: ${ this . serverId } )` )
838+ this . persistentLeaderOf . delete ( namespace )
839+ }
840+ }
841+
842+ await promise . all ( promises )
843+ } catch ( e ) {
844+ console . error ( e )
845+ }
846+
847+ // register next round
848+ this . persistentHeartbeatTimeout = setTimeout (
849+ ( ) => this . registerPersistentLeaderHeartbeat ( ) ,
850+ PERSIST_LEADER_HEARTBEAT_INTERVAL / 2
851+ )
852+ } )
853+ }
854+
855+ /**
856+ * @param {string } namespace
857+ */
858+ async tryAcquirePersistentLeader ( namespace ) {
859+ assert ( this . client )
860+ const redis = this . client . redis
861+ const key = this . getLeaderKeyOf ( namespace )
862+ const ok = await redis . set ( key , this . serverId , {
863+ NX : true ,
864+ PX : PERSIST_LEADER_HEARTBEAT_INTERVAL
865+ } )
866+ if ( ! ok ) return false
867+
868+ this . persistentLeaderOf . add ( namespace )
869+ return true
870+ }
770871}
0 commit comments