1
- import type { PersistedConn } from "./connection" ;
2
1
import type { Logger } from "@/common//log" ;
3
- import { type ActorTags , isJsonSerializable , stringifyError } from "@/common//utils" ;
2
+ import {
3
+ type ActorTags ,
4
+ isJsonSerializable ,
5
+ stringifyError ,
6
+ } from "@/common//utils" ;
4
7
import onChange from "on-change" ;
5
8
import type { ActorConfig } from "./config" ;
6
9
import { Conn , type ConnId } from "./connection" ;
@@ -9,15 +12,15 @@ import type { ConnDriver } from "./driver";
9
12
import * as errors from "./errors" ;
10
13
import { processMessage } from "./protocol/message/mod" ;
11
14
import { instanceLogger , logger } from "./log" ;
12
- import { ActionContext } from "./action" ;
15
+ import type { ActionContext } from "./action" ;
13
16
import { Lock , deadline } from "./utils" ;
14
17
import { Schedule } from "./schedule" ;
15
- import { KEYS } from "./keys" ;
16
18
import type * as wsToServer from "@/actor/protocol/message/to-server" ;
17
19
import { CachedSerializer } from "./protocol/serde" ;
18
20
import { ActorInspector } from "@/inspector/actor" ;
19
21
import { ActorContext } from "./context" ;
20
22
import invariant from "invariant" ;
23
+ import type { PersistedActor , PersistedConn , PersistedScheduleEvents } from "./persisted" ;
21
24
22
25
/**
23
26
* Options for the `_saveState` method.
@@ -72,14 +75,6 @@ export type ExtractActorConnState<A extends AnyActorInstance> =
72
75
? ConnState
73
76
: never ;
74
77
75
- /** State object that gets automatically persisted to storage. */
76
- interface PersistedActor < S , CP , CS > {
77
- // State
78
- s : S ;
79
- // Connections
80
- c : PersistedConn < CP , CS > [ ] ;
81
- }
82
-
83
78
export class ActorInstance < S , CP , CS , V > {
84
79
// Shared actor context for this instance
85
80
actorContext : ActorContext < S , CP , CS , V > ;
@@ -155,7 +150,7 @@ export class ActorInstance<S, CP, CS, V> {
155
150
this . #name = name ;
156
151
this . #tags = tags ;
157
152
this . #region = region ;
158
- this . #schedule = new Schedule ( this , actorDriver ) ;
153
+ this . #schedule = new Schedule ( this ) ;
159
154
this . inspector = new ActorInspector ( this ) ;
160
155
161
156
// Initialize server
@@ -171,7 +166,12 @@ export class ActorInstance<S, CP, CS, V> {
171
166
let vars : V | undefined = undefined ;
172
167
if ( "createVars" in this . #config) {
173
168
const dataOrPromise = this . #config. createVars (
174
- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
169
+ this . actorContext as unknown as ActorContext <
170
+ undefined ,
171
+ undefined ,
172
+ undefined ,
173
+ undefined
174
+ > ,
175
175
this . #actorDriver. getContext ( this . #actorId) ,
176
176
) ;
177
177
if ( dataOrPromise instanceof Promise ) {
@@ -200,8 +200,101 @@ export class ActorInstance<S, CP, CS, V> {
200
200
this . #ready = true ;
201
201
}
202
202
203
+ async scheduleEvent (
204
+ timestamp : number ,
205
+ fn : string ,
206
+ args : unknown [ ] ,
207
+ ) : Promise < void > {
208
+ // Build event
209
+ const eventId = crypto . randomUUID ( ) ;
210
+ const newEvent : PersistedScheduleEvents = {
211
+ e : eventId ,
212
+ t : timestamp ,
213
+ a : fn ,
214
+ ar : args ,
215
+ } ;
216
+
217
+ this . actorContext . log . info ( "scheduling event" , {
218
+ event : eventId ,
219
+ timestamp,
220
+ action : fn
221
+ } ) ;
222
+
223
+ // Insert event in to index
224
+ const insertIndex = this . #persist. e . findIndex ( ( x ) => x . t > newEvent . t ) ;
225
+ if ( insertIndex === - 1 ) {
226
+ this . #persist. e . push ( newEvent ) ;
227
+ } else {
228
+ this . #persist. e . splice ( insertIndex , 0 , newEvent ) ;
229
+ }
230
+
231
+ // Update alarm if:
232
+ // - this is the newest event (i.e. at beginning of array) or
233
+ // - this is the only event (i.e. the only event in the array)
234
+ if ( insertIndex === 0 || this . #persist. e . length === 1 ) {
235
+ this . actorContext . log . info ( "setting alarm" , { timestamp } ) ;
236
+ await this . #actorDriver. setAlarm ( this , newEvent . t ) ;
237
+ }
238
+ }
239
+
203
240
async onAlarm ( ) {
204
- await this . #schedule. __onAlarm ( ) ;
241
+ const now = Date . now ( ) ;
242
+ this . actorContext . log . debug ( "alarm triggered" , { now, events : this . #persist. e . length } ) ;
243
+
244
+ // Remove events from schedule that we're about to run
245
+ const runIndex = this . #persist. e . findIndex ( ( x ) => x . t <= now ) ;
246
+ if ( runIndex === - 1 ) {
247
+ this . actorContext . log . debug ( "no events to run" , { now } ) ;
248
+ return ;
249
+ }
250
+ const scheduleEvents = this . #persist. e . splice ( 0 , runIndex + 1 ) ;
251
+ this . actorContext . log . debug ( "running events" , { count : scheduleEvents . length } ) ;
252
+
253
+ // Set alarm for next event
254
+ if ( this . #persist. e . length > 0 ) {
255
+ await this . #actorDriver. setAlarm ( this , this . #persist. e [ 0 ] . t ) ;
256
+ }
257
+
258
+ // Iterate by event key in order to ensure we call the events in order
259
+ for ( const event of scheduleEvents ) {
260
+ try {
261
+ this . actorContext . log . info ( "running action for event" , {
262
+ event : event . e ,
263
+ timestamp : event . t ,
264
+ action : event . a ,
265
+ args : event . ar
266
+ } ) ;
267
+
268
+ // Look up function
269
+ const fn : unknown = this . #config. actions [ event . a ] ;
270
+ if ( ! fn ) throw new Error ( `Missing action for alarm ${ event . a } ` ) ;
271
+ if ( typeof fn !== "function" )
272
+ throw new Error (
273
+ `Alarm function lookup for ${ event . a } returned ${ typeof fn } ` ,
274
+ ) ;
275
+
276
+ // Call function
277
+ try {
278
+ await fn . call ( undefined , this . actorContext , ...event . ar ) ;
279
+ } catch ( error ) {
280
+ this . actorContext . log . error ( "error while running event" , {
281
+ error : stringifyError ( error ) ,
282
+ event : event . e ,
283
+ timestamp : event . t ,
284
+ action : event . a ,
285
+ args : event . ar ,
286
+ } ) ;
287
+ }
288
+ } catch ( error ) {
289
+ this . actorContext . log . error ( "internal error while running event" , {
290
+ error : stringifyError ( error ) ,
291
+ event : event . e ,
292
+ timestamp : event . t ,
293
+ action : event . a ,
294
+ args : event . ar ,
295
+ } ) ;
296
+ }
297
+ }
205
298
}
206
299
207
300
get stateEnabled ( ) {
@@ -268,9 +361,8 @@ export class ActorInstance<S, CP, CS, V> {
268
361
this . #persistChanged = false ;
269
362
270
363
// Write to KV
271
- await this . #actorDriver. kvPut (
364
+ await this . #actorDriver. writePersistedData (
272
365
this . #actorId,
273
- KEYS . STATE . DATA ,
274
366
this . #persistRaw,
275
367
) ;
276
368
@@ -359,12 +451,11 @@ export class ActorInstance<S, CP, CS, V> {
359
451
360
452
async #initialize( ) {
361
453
// Read initial state
362
- const [ initialized , persistData ] = ( await this . #actorDriver. kvGetBatch (
454
+ const persistData = ( await this . #actorDriver. readPersistedData (
363
455
this . #actorId,
364
- [ KEYS . STATE . INITIALIZED , KEYS . STATE . DATA ] ,
365
- ) ) as [ boolean , PersistedActor < S , CP , CS > ] ;
456
+ ) ) as PersistedActor < S , CP , CS > ;
366
457
367
- if ( initialized ) {
458
+ if ( persistData !== undefined ) {
368
459
logger ( ) . info ( "actor restoring" , {
369
460
connections : persistData . c . length ,
370
461
} ) ;
@@ -406,7 +497,12 @@ export class ActorInstance<S, CP, CS, V> {
406
497
407
498
// Convert state to undefined since state is not defined yet here
408
499
stateData = await this . #config. createState (
409
- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
500
+ this . actorContext as unknown as ActorContext <
501
+ undefined ,
502
+ undefined ,
503
+ undefined ,
504
+ undefined
505
+ > ,
410
506
) ;
411
507
} else if ( "state" in this . #config) {
412
508
stateData = structuredClone ( this . #config. state ) ;
@@ -420,14 +516,12 @@ export class ActorInstance<S, CP, CS, V> {
420
516
const persist : PersistedActor < S , CP , CS > = {
421
517
s : stateData as S ,
422
518
c : [ ] ,
519
+ e : [ ] ,
423
520
} ;
424
521
425
522
// Update state
426
523
logger ( ) . debug ( "writing state" ) ;
427
- await this . #actorDriver. kvPutBatch ( this . #actorId, [
428
- [ KEYS . STATE . INITIALIZED , true ] ,
429
- [ KEYS . STATE . DATA , persist ] ,
430
- ] ) ;
524
+ await this . #actorDriver. writePersistedData ( this . #actorId, persist ) ;
431
525
432
526
this . #setPersist( persist ) ;
433
527
}
@@ -509,7 +603,12 @@ export class ActorInstance<S, CP, CS, V> {
509
603
if ( this . #connStateEnabled) {
510
604
if ( "createConnState" in this . #config) {
511
605
const dataOrPromise = this . #config. createConnState (
512
- this . actorContext as unknown as ActorContext < undefined , undefined , undefined , undefined > ,
606
+ this . actorContext as unknown as ActorContext <
607
+ undefined ,
608
+ undefined ,
609
+ undefined ,
610
+ undefined
611
+ > ,
513
612
onBeforeConnectOpts ,
514
613
) ;
515
614
if ( dataOrPromise instanceof Promise ) {
@@ -723,6 +822,8 @@ export class ActorInstance<S, CP, CS, V> {
723
822
rpcName : string ,
724
823
args : unknown [ ] ,
725
824
) : Promise < unknown > {
825
+ invariant ( this . #ready, "exucuting rpc before ready" ) ;
826
+
726
827
// Prevent calling private or reserved methods
727
828
if ( ! ( rpcName in this . #config. actions ) ) {
728
829
logger ( ) . warn ( "rpc does not exist" , { rpcName } ) ;
0 commit comments