11import * as cbor from "cbor-x" ;
22import invariant from "invariant" ;
33import onChange from "on-change" ;
4+ import { success } from "zod/v4" ;
45import type { ActorKey } from "@/actor/mod" ;
56import type * as wsToClient from "@/actor/protocol/message/to-client" ;
67import type * as wsToServer from "@/actor/protocol/message/to-server" ;
@@ -9,10 +10,10 @@ import type { Logger } from "@/common/log";
910import { isCborSerializable , stringifyError } from "@/common/utils" ;
1011import type { UniversalWebSocket } from "@/common/websocket-interface" ;
1112import { ActorInspector } from "@/inspector/actor" ;
12- import type { Registry , RegistryConfig } from "@/mod" ;
13+ import type { Registry } from "@/mod" ;
1314import type { ActionContext } from "./action" ;
1415import type { ActorConfig } from "./config" ;
15- import { Conn , type ConnId } from "./connection" ;
16+ import { Conn , type ConnId , generatePing } from "./connection" ;
1617import { ActorContext } from "./context" ;
1718import type { AnyDatabaseProvider , InferDatabaseClient } from "./database" ;
1819import type { ActorDriver , ConnDriver , ConnDrivers } from "./driver" ;
@@ -157,6 +158,11 @@ export class ActorInstance<
157158 #ready = false ;
158159
159160 #connections = new Map < ConnId , Conn < S , CP , CS , V , I , AD , DB > > ( ) ;
161+ // This is used to track the last ping sent to the client, when restoring a connection
162+ #connectionsPingRequests = new Map <
163+ ConnId ,
164+ { ping : string ; resolve : ( ) => void }
165+ > ( ) ;
160166 #subscriptionIndex = new Map < string , Set < Conn < S , CP , CS , V , I , AD , DB > > > ( ) ;
161167
162168 #schedule! : Schedule ;
@@ -591,6 +597,8 @@ export class ActorInstance<
591597 // Set initial state
592598 this . #setPersist( persistData ) ;
593599
600+ const restorePromises = [ ] ;
601+
594602 // Load connections
595603 for ( const connPersist of this . #persist. c ) {
596604 // Create connections
@@ -601,13 +609,60 @@ export class ActorInstance<
601609 driver ,
602610 this . #connStateEnabled,
603611 ) ;
604- this . #connections. set ( conn . id , conn ) ;
605612
606- // Register event subscriptions
607- for ( const sub of connPersist . su ) {
608- this . #addSubscription( sub . n , conn , true ) ;
609- }
613+ // send ping, to ensure the connection is alive
614+
615+ const { promise, resolve } = Promise . withResolvers < void > ( ) ;
616+ restorePromises . push (
617+ Promise . race ( [
618+ promise ,
619+ new Promise < void > ( ( _ , reject ) => {
620+ setTimeout ( ( ) => {
621+ reject ( ) ;
622+ } , 2500 ) ;
623+ } ) ,
624+ ] )
625+ . catch ( ( ) => {
626+ process . nextTick ( ( ) => {
627+ logger ( ) . debug ( "connection restore failed" , {
628+ connId : conn . id ,
629+ } ) ;
630+ this . #connections. delete ( conn . id ) ;
631+ conn . disconnect (
632+ "Connection restore failed, connection is not alive" ,
633+ ) ;
634+ this . __removeConn ( conn ) ;
635+ } ) ;
636+ } )
637+ . then ( ( ) => {
638+ logger ( ) . debug ( "connection restored" , {
639+ connId : conn . id ,
640+ } ) ;
641+ this . #connections. set ( conn . id , conn ) ;
642+
643+ // Register event subscriptions
644+ for ( const sub of connPersist . su ) {
645+ this . #addSubscription( sub . n , conn , true ) ;
646+ }
647+ } ) ,
648+ ) ;
649+
650+ const ping = generatePing ( ) ;
651+ this . #connectionsPingRequests. set ( conn . id , { ping, resolve } ) ;
652+ conn . _sendMessage (
653+ new CachedSerializer < wsToClient . ToClient > ( {
654+ b : {
655+ p : ping ,
656+ } ,
657+ } ) ,
658+ ) ;
610659 }
660+
661+ const result = await Promise . allSettled ( restorePromises ) ;
662+ logger ( ) . info ( "connections restored" , {
663+ success : result . filter ( ( r ) => r . status === "fulfilled" ) . length ,
664+ failed : result . filter ( ( r ) => r . status === "rejected" ) . length ,
665+ } ) ;
611666 } else {
612667 logger ( ) . info ( "actor creating" ) ;
613668
@@ -818,6 +873,8 @@ export class ActorInstance<
818873 this . #persist. c . push ( persist ) ;
819874 this . saveState ( { immediate : true } ) ;
820875
876+ this . inspector . emitter . emit ( "connectionUpdated" ) ;
877+
821878 // Handle connection
822879 if ( this . #config. onConnect ) {
823880 try {
@@ -841,8 +898,6 @@ export class ActorInstance<
841898 }
842899 }
843900
844- this . inspector . emitter . emit ( "connectionUpdated" ) ;
845-
846901 // Send init message
847902 conn . _sendMessage (
848903 new CachedSerializer < wsToClient . ToClient > ( {
@@ -890,6 +945,14 @@ export class ActorInstance<
890945 } ) ;
891946 this . #removeSubscription( eventName , conn , false ) ;
892947 } ,
948+ onPong : async ( pong , conn ) => {
949+ const pingRequest = this . #connectionsPingRequests. get ( conn . id ) ;
950+ if ( pingRequest ?. ping === pong ) {
951+ // Resolve the ping request if it matches the last sent ping
952+ pingRequest . resolve ( ) ;
953+ this . #connectionsPingRequests. delete ( conn . id ) ;
954+ }
955+ } ,
893956 } ) ;
894957 }
895958
0 commit comments