Skip to content

Commit 49e5771

Browse files
committed
test(driver-tests): add reusable tests for all drivers (#864)
1 parent 7c8f0ab commit 49e5771

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+2372
-866
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"dev": "npx turbo watch dev",
1818
"build": "npx turbo build",
1919
"test": "npx turbo test",
20+
"test:watch": "npx turbo watch test",
2021
"check-types": "npx turbo check-types",
2122
"fmt": "yarn biome check --write .",
2223
"dev-docs": "cd docs && yarn dlx mintlify@latest dev",

packages/actor-core/src/actor/connection.ts

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
1-
import type {
2-
ActorInstance,
3-
AnyActorInstance,
4-
ExtractActorConnParams,
5-
ExtractActorConnState,
6-
} from "./instance";
1+
import type { ActorInstance } from "./instance";
72
import * as errors from "./errors";
83
import { generateSecureToken } from "./utils";
9-
import { CachedSerializer, Encoding } from "./protocol/serde";
10-
import { logger } from "./log";
11-
import { ConnDriver } from "./driver";
4+
import { CachedSerializer } from "./protocol/serde";
5+
import type { ConnDriver } from "./driver";
126
import * as messageToClient from "@/actor/protocol/message/to-client";
13-
import { Actions } from "./config";
7+
import type { PersistedConn } from "./persisted";
148

159
export function generateConnId(): string {
1610
return crypto.randomUUID();
@@ -22,29 +16,6 @@ export function generateConnToken(): string {
2216

2317
export type ConnId = string;
2418

25-
/** Object representing connection that gets persisted to storage. */
26-
export interface PersistedConn<CP, CS> {
27-
// ID
28-
i: string;
29-
// Token
30-
t: string;
31-
// Connection driver
32-
d: string;
33-
// Connection driver state
34-
ds: unknown;
35-
// Parameters
36-
p: CP;
37-
// State
38-
s: CS;
39-
// Subscriptions
40-
su: PersistedSub[];
41-
}
42-
43-
export interface PersistedSub {
44-
// Event name
45-
n: string;
46-
}
47-
4819
export type AnyConn = Conn<any, any, any, any>;
4920

5021
/**

packages/actor-core/src/actor/driver.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,12 @@ import { AnyConn } from "./connection";
55

66
export type ConnDrivers = Record<string, ConnDriver>;
77

8-
export type KvKey = unknown[];
9-
export type KvValue = unknown;
10-
11-
128
export interface ActorDriver {
139
//load(): Promise<LoadOutput>;
1410
getContext(actorId: string): unknown;
1511

16-
// HACK: Clean these up
17-
kvGet(actorId: string, key: KvKey): Promise<KvValue | undefined>;
18-
kvGetBatch(actorId: string, key: KvKey[]): Promise<(KvValue | undefined)[]>;
19-
kvPut(actorId: string, key: KvKey, value: KvValue): Promise<void>;
20-
kvPutBatch(actorId: string, key: [KvKey, KvValue][]): Promise<void>;
21-
kvDelete(actorId: string, key: KvKey): Promise<void>;
22-
kvDeleteBatch(actorId: string, key: KvKey[]): Promise<void>;
12+
readPersistedData(actorId: string): Promise<unknown | undefined>;
13+
writePersistedData(actorId: string, unknown: unknown): Promise<void>;
2314

2415
// Schedule
2516
setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void>;

packages/actor-core/src/actor/instance.ts

Lines changed: 128 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
import type { PersistedConn } from "./connection";
21
import type { Logger } from "@/common//log";
3-
import { type ActorTags, isJsonSerializable, stringifyError } from "@/common//utils";
2+
import {
3+
type ActorTags,
4+
isJsonSerializable,
5+
stringifyError,
6+
} from "@/common//utils";
47
import onChange from "on-change";
58
import type { ActorConfig } from "./config";
69
import { Conn, type ConnId } from "./connection";
@@ -9,15 +12,15 @@ import type { ConnDriver } from "./driver";
912
import * as errors from "./errors";
1013
import { processMessage } from "./protocol/message/mod";
1114
import { instanceLogger, logger } from "./log";
12-
import { ActionContext } from "./action";
15+
import type { ActionContext } from "./action";
1316
import { Lock, deadline } from "./utils";
1417
import { Schedule } from "./schedule";
15-
import { KEYS } from "./keys";
1618
import type * as wsToServer from "@/actor/protocol/message/to-server";
1719
import { CachedSerializer } from "./protocol/serde";
1820
import { ActorInspector } from "@/inspector/actor";
1921
import { ActorContext } from "./context";
2022
import invariant from "invariant";
23+
import type { PersistedActor, PersistedConn, PersistedScheduleEvents } from "./persisted";
2124

2225
/**
2326
* Options for the `_saveState` method.
@@ -72,14 +75,6 @@ export type ExtractActorConnState<A extends AnyActorInstance> =
7275
? ConnState
7376
: never;
7477

75-
/** State object that gets automatically persisted to storage. */
76-
interface PersistedActor<S, CP, CS> {
77-
// State
78-
s: S;
79-
// Connections
80-
c: PersistedConn<CP, CS>[];
81-
}
82-
8378
export class ActorInstance<S, CP, CS, V> {
8479
// Shared actor context for this instance
8580
actorContext: ActorContext<S, CP, CS, V>;
@@ -155,7 +150,7 @@ export class ActorInstance<S, CP, CS, V> {
155150
this.#name = name;
156151
this.#tags = tags;
157152
this.#region = region;
158-
this.#schedule = new Schedule(this, actorDriver);
153+
this.#schedule = new Schedule(this);
159154
this.inspector = new ActorInspector(this);
160155

161156
// Initialize server
@@ -171,7 +166,12 @@ export class ActorInstance<S, CP, CS, V> {
171166
let vars: V | undefined = undefined;
172167
if ("createVars" in this.#config) {
173168
const dataOrPromise = this.#config.createVars(
174-
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
169+
this.actorContext as unknown as ActorContext<
170+
undefined,
171+
undefined,
172+
undefined,
173+
undefined
174+
>,
175175
this.#actorDriver.getContext(this.#actorId),
176176
);
177177
if (dataOrPromise instanceof Promise) {
@@ -200,8 +200,101 @@ export class ActorInstance<S, CP, CS, V> {
200200
this.#ready = true;
201201
}
202202

203+
async scheduleEvent(
204+
timestamp: number,
205+
fn: string,
206+
args: unknown[],
207+
): Promise<void> {
208+
// Build event
209+
const eventId = crypto.randomUUID();
210+
const newEvent: PersistedScheduleEvents = {
211+
e: eventId,
212+
t: timestamp,
213+
a: fn,
214+
ar: args,
215+
};
216+
217+
this.actorContext.log.info("scheduling event", {
218+
event: eventId,
219+
timestamp,
220+
action: fn
221+
});
222+
223+
// Insert event in to index
224+
const insertIndex = this.#persist.e.findIndex((x) => x.t > newEvent.t);
225+
if (insertIndex === -1) {
226+
this.#persist.e.push(newEvent);
227+
} else {
228+
this.#persist.e.splice(insertIndex, 0, newEvent);
229+
}
230+
231+
// Update alarm if:
232+
// - this is the newest event (i.e. at beginning of array) or
233+
// - this is the only event (i.e. the only event in the array)
234+
if (insertIndex === 0 || this.#persist.e.length === 1) {
235+
this.actorContext.log.info("setting alarm", { timestamp });
236+
await this.#actorDriver.setAlarm(this, newEvent.t);
237+
}
238+
}
239+
203240
async onAlarm() {
204-
await this.#schedule.__onAlarm();
241+
const now = Date.now();
242+
this.actorContext.log.debug("alarm triggered", { now, events: this.#persist.e.length });
243+
244+
// Remove events from schedule that we're about to run
245+
const runIndex = this.#persist.e.findIndex((x) => x.t <= now);
246+
if (runIndex === -1) {
247+
this.actorContext.log.debug("no events to run", { now });
248+
return;
249+
}
250+
const scheduleEvents = this.#persist.e.splice(0, runIndex + 1);
251+
this.actorContext.log.debug("running events", { count: scheduleEvents.length });
252+
253+
// Set alarm for next event
254+
if (this.#persist.e.length > 0) {
255+
await this.#actorDriver.setAlarm(this, this.#persist.e[0].t);
256+
}
257+
258+
// Iterate by event key in order to ensure we call the events in order
259+
for (const event of scheduleEvents) {
260+
try {
261+
this.actorContext.log.info("running action for event", {
262+
event: event.e,
263+
timestamp: event.t,
264+
action: event.a,
265+
args: event.ar
266+
});
267+
268+
// Look up function
269+
const fn: unknown = this.#config.actions[event.a];
270+
if (!fn) throw new Error(`Missing action for alarm ${event.a}`);
271+
if (typeof fn !== "function")
272+
throw new Error(
273+
`Alarm function lookup for ${event.a} returned ${typeof fn}`,
274+
);
275+
276+
// Call function
277+
try {
278+
await fn.call(undefined, this.actorContext, ...event.ar);
279+
} catch (error) {
280+
this.actorContext.log.error("error while running event", {
281+
error: stringifyError(error),
282+
event: event.e,
283+
timestamp: event.t,
284+
action: event.a,
285+
args: event.ar,
286+
});
287+
}
288+
} catch (error) {
289+
this.actorContext.log.error("internal error while running event", {
290+
error: stringifyError(error),
291+
event: event.e,
292+
timestamp: event.t,
293+
action: event.a,
294+
args: event.ar,
295+
});
296+
}
297+
}
205298
}
206299

207300
get stateEnabled() {
@@ -268,9 +361,8 @@ export class ActorInstance<S, CP, CS, V> {
268361
this.#persistChanged = false;
269362

270363
// Write to KV
271-
await this.#actorDriver.kvPut(
364+
await this.#actorDriver.writePersistedData(
272365
this.#actorId,
273-
KEYS.STATE.DATA,
274366
this.#persistRaw,
275367
);
276368

@@ -359,12 +451,11 @@ export class ActorInstance<S, CP, CS, V> {
359451

360452
async #initialize() {
361453
// Read initial state
362-
const [initialized, persistData] = (await this.#actorDriver.kvGetBatch(
454+
const persistData = (await this.#actorDriver.readPersistedData(
363455
this.#actorId,
364-
[KEYS.STATE.INITIALIZED, KEYS.STATE.DATA],
365-
)) as [boolean, PersistedActor<S, CP, CS>];
456+
)) as PersistedActor<S, CP, CS>;
366457

367-
if (initialized) {
458+
if (persistData !== undefined) {
368459
logger().info("actor restoring", {
369460
connections: persistData.c.length,
370461
});
@@ -406,7 +497,12 @@ export class ActorInstance<S, CP, CS, V> {
406497

407498
// Convert state to undefined since state is not defined yet here
408499
stateData = await this.#config.createState(
409-
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
500+
this.actorContext as unknown as ActorContext<
501+
undefined,
502+
undefined,
503+
undefined,
504+
undefined
505+
>,
410506
);
411507
} else if ("state" in this.#config) {
412508
stateData = structuredClone(this.#config.state);
@@ -420,14 +516,12 @@ export class ActorInstance<S, CP, CS, V> {
420516
const persist: PersistedActor<S, CP, CS> = {
421517
s: stateData as S,
422518
c: [],
519+
e: [],
423520
};
424521

425522
// Update state
426523
logger().debug("writing state");
427-
await this.#actorDriver.kvPutBatch(this.#actorId, [
428-
[KEYS.STATE.INITIALIZED, true],
429-
[KEYS.STATE.DATA, persist],
430-
]);
524+
await this.#actorDriver.writePersistedData(this.#actorId, persist);
431525

432526
this.#setPersist(persist);
433527
}
@@ -509,7 +603,12 @@ export class ActorInstance<S, CP, CS, V> {
509603
if (this.#connStateEnabled) {
510604
if ("createConnState" in this.#config) {
511605
const dataOrPromise = this.#config.createConnState(
512-
this.actorContext as unknown as ActorContext<undefined, undefined, undefined, undefined>,
606+
this.actorContext as unknown as ActorContext<
607+
undefined,
608+
undefined,
609+
undefined,
610+
undefined
611+
>,
513612
onBeforeConnectOpts,
514613
);
515614
if (dataOrPromise instanceof Promise) {
@@ -723,6 +822,8 @@ export class ActorInstance<S, CP, CS, V> {
723822
rpcName: string,
724823
args: unknown[],
725824
): Promise<unknown> {
825+
invariant(this.#ready, "exucuting rpc before ready");
826+
726827
// Prevent calling private or reserved methods
727828
if (!(rpcName in this.#config.actions)) {
728829
logger().warn("rpc does not exist", { rpcName });

packages/actor-core/src/actor/keys.ts

Lines changed: 0 additions & 16 deletions
This file was deleted.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/** State object that gets automatically persisted to storage. */
2+
export interface PersistedActor<S, CP, CS> {
3+
// State
4+
s: S;
5+
// Connections
6+
c: PersistedConn<CP, CS>[];
7+
// Scheduled events
8+
e: PersistedScheduleEvents[];
9+
}
10+
11+
/** Object representing connection that gets persisted to storage. */
12+
export interface PersistedConn<CP, CS> {
13+
// ID
14+
i: string;
15+
// Token
16+
t: string;
17+
// Connection driver
18+
d: string;
19+
// Connection driver state
20+
ds: unknown;
21+
// Parameters
22+
p: CP;
23+
// State
24+
s: CS;
25+
// Subscriptions
26+
su: PersistedSubscription[];
27+
}
28+
29+
export interface PersistedSubscription {
30+
// Event name
31+
n: string;
32+
}
33+
34+
export interface PersistedScheduleEvents {
35+
// Event ID
36+
e: string;
37+
// Timestamp
38+
t: number;
39+
// Action name
40+
a: string;
41+
// Arguments
42+
ar: unknown[];
43+
}

0 commit comments

Comments
 (0)