Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/rivet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"typescript": "^5.5.2"
},
"dependencies": {
"@rivetkit/actor": "https://pkg.pr.new/rivet-gg/rivetkit/@rivetkit/actor@cb1e6d4"
"@rivetkit/actor": "https://pkg.pr.new/rivet-gg/rivetkit/@rivetkit/actor@7e018f2"
},
"stableVersion": "0.8.0"
}
2 changes: 1 addition & 1 deletion examples/rivet/src/registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ export const counter = actor({
});

export const registry = setup({
actors: { counter },
use: { counter },
});

6 changes: 2 additions & 4 deletions packages/core/src/actor/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ export interface ActorDriver {
//load(): Promise<LoadOutput>;
getContext(actorId: string): unknown;

readInput(actorId: string): Promise<unknown | undefined>;

readPersistedData(actorId: string): Promise<unknown | undefined>;
writePersistedData(actorId: string, unknown: unknown): Promise<void>;
readPersistedData(actorId: string): Promise<Uint8Array | undefined>;
writePersistedData(actorId: string, data: Uint8Array): Promise<void>;

// Schedule
setAlarm(actor: AnyActorInstance, timestamp: number): Promise<void>;
Expand Down
50 changes: 31 additions & 19 deletions packages/core/src/actor/instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { processMessage } from "./protocol/message/mod";
import { CachedSerializer } from "./protocol/serde";
import { Schedule } from "./schedule";
import { DeadlineError, Lock, deadline } from "./utils";
import * as cbor from "cbor-x";

/**
* Options for the `_saveState` method.
Expand Down Expand Up @@ -122,10 +123,10 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
*
* Any data that should be stored indefinitely should be held within this object.
*/
#persist!: PersistedActor<S, CP, CS>;
#persist!: PersistedActor<S, CP, CS, I>;

/** Raw state without the proxy wrapper */
#persistRaw!: PersistedActor<S, CP, CS>;
#persistRaw!: PersistedActor<S, CP, CS, I>;

#writePersistLock = new Lock<void>(void 0);

Expand Down Expand Up @@ -426,7 +427,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
// Write to KV
await this.#actorDriver.writePersistedData(
this.#actorId,
this.#persistRaw,
cbor.encode(this.#persistRaw),
);

logger().debug("persist saved");
Expand All @@ -443,7 +444,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
/**
* Creates proxy for `#persist` that handles automatically flagging when state needs to be updated.
*/
#setPersist(target: PersistedActor<S, CP, CS>) {
#setPersist(target: PersistedActor<S, CP, CS, I>) {
// Set raw persist object
this.#persistRaw = target;

Expand Down Expand Up @@ -514,11 +515,21 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {

async #initialize() {
// Read initial state
const persistData = (await this.#actorDriver.readPersistedData(
const persistDataBuffer = await this.#actorDriver.readPersistedData(
this.#actorId,
)) as PersistedActor<S, CP, CS>;

if (persistData !== undefined) {
);
invariant(
persistDataBuffer !== undefined,
"persist data has not been set, it should be set when initialized",
);
const persistData = cbor.decode(persistDataBuffer) as PersistedActor<
S,
CP,
CS,
I
>;

if (persistData.hi) {
logger().info("actor restoring", {
connections: persistData.c.length,
});
Expand Down Expand Up @@ -546,8 +557,6 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
} else {
logger().info("actor creating");

const input = (await this.#actorDriver.readInput(this.#actorId)) as I;

// Initialize actor state
let stateData: unknown = undefined;
if (this.stateEnabled) {
Expand All @@ -567,7 +576,7 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
undefined,
undefined
>,
{ input },
{ input: persistData.i },
);
} else if ("state" in this.#config) {
stateData = structuredClone(this.#config.state);
Expand All @@ -578,21 +587,24 @@ export class ActorInstance<S, CP, CS, V, I, AD, DB> {
logger().debug("state not enabled");
}

const persist: PersistedActor<S, CP, CS> = {
s: stateData as S,
c: [],
e: [],
};
// Save state and mark as initialized
persistData.s = stateData as S;
persistData.hi = true;

// Update state
logger().debug("writing state");
await this.#actorDriver.writePersistedData(this.#actorId, persist);
await this.#actorDriver.writePersistedData(
this.#actorId,
cbor.encode(persistData),
);

this.#setPersist(persist);
this.#setPersist(persistData);

// Notify creation
if (this.#config.onCreate) {
await this.#config.onCreate(this.actorContext, { input });
await this.#config.onCreate(this.actorContext, {
input: persistData.i,
});
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion packages/core/src/actor/persisted.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
/** State object that gets automatically persisted to storage. */
export interface PersistedActor<S, CP, CS> {
export interface PersistedActor<S, CP, CS, I> {
// Input
i?: I,
// Has initialized
hi: boolean,
// State
s: S;
// Connections
Expand Down
16 changes: 16 additions & 0 deletions packages/core/src/driver-helpers/mod.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { PersistedActor } from "@/actor/persisted";

export type { ActorInstance, AnyActorInstance } from "@/actor/instance";
export type {
AttemptAcquireLease,
Expand Down Expand Up @@ -27,3 +29,17 @@ export {
HEADER_CONN_TOKEN,
} from "@/actor/router-endpoints";
export { RunConfigSchema, DriverConfigSchema } from "@/registry/run-config";
import * as cbor from "cbor-x";

export function serializeEmptyPersistData(
input: unknown | undefined,
): Uint8Array {
const persistData: PersistedActor<any, any, any, any> = {
i: input,
hi: false,
s: undefined,
c: [],
e: [],
};
return cbor.encode(persistData);
}
8 changes: 2 additions & 6 deletions packages/core/src/drivers/memory/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,11 @@ export class MemoryActorDriver implements ActorDriver {
return {};
}

async readInput(actorId: string): Promise<unknown | undefined> {
return this.#state.readInput(actorId);
}

async readPersistedData(actorId: string): Promise<unknown | undefined> {
async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
return this.#state.readPersistedData(actorId);
}

async writePersistedData(actorId: string, data: unknown): Promise<void> {
async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
this.#state.writePersistedData(actorId, data);
}

Expand Down
17 changes: 6 additions & 11 deletions packages/core/src/drivers/memory/global-state.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import type { ActorKey } from "@/actor/mod";
import { serializeEmptyPersistData } from "@/driver-helpers/mod";

export interface ActorState {
id: string;
name: string;
key: ActorKey;
persistedData: unknown;
input?: unknown;
persistedData: Uint8Array;
}

export class MemoryGlobalState {
Expand All @@ -19,32 +19,27 @@ export class MemoryGlobalState {
return actor;
}

readInput(actorId: string): unknown | undefined {
return this.#getActor(actorId).input;
}

readPersistedData(actorId: string): unknown | undefined {
readPersistedData(actorId: string): Uint8Array | undefined {
return this.#getActor(actorId).persistedData;
}

writePersistedData(actorId: string, data: unknown) {
writePersistedData(actorId: string, data: Uint8Array) {
this.#getActor(actorId).persistedData = data;
}

createActor(
actorId: string,
name: string,
key: ActorKey,
input?: unknown,
input: unknown | undefined,
): void {
// Create actor state if it doesn't exist
if (!this.#actors.has(actorId)) {
this.#actors.set(actorId, {
id: actorId,
name,
key,
persistedData: undefined,
input,
persistedData: serializeEmptyPersistData(input),
});
} else {
throw new Error(`Actor already exists for ID: ${actorId}`);
Expand Down
24 changes: 5 additions & 19 deletions packages/core/src/drivers/rivet/actor-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,18 @@ export class RivetActorDriver implements ActorDriver {
return { ctx: this.#ctx };
}

async readInput(_actorId: string): Promise<unknown | undefined> {
// Read input
//
// We need to have a separate exists flag in order to represent `undefined`
const entries = await this.#ctx.kv.getBatch([
["rivetkit", "input", "exists"],
["rivetkit", "input", "data"],
]);

if (entries.get(["rivetkit", "input", "exists"]) === true) {
return await entries.get(["rivetkit", "input", "data"]);
} else {
return undefined;
}
}

async readPersistedData(_actorId: string): Promise<unknown | undefined> {
let data = await this.#ctx.kv.get(["rivetkit", "data"]);
async readPersistedData(_actorId: string): Promise<Uint8Array | undefined> {
let data = (await this.#ctx.kv.get(["rivetkit", "data"])) as
| Uint8Array
| undefined;

// HACK: Modify to be undefined if null. This will be fixed in Actors v2.
if (data === null) data = undefined;

return data;
}

async writePersistedData(_actorId: string, data: unknown): Promise<void> {
async writePersistedData(_actorId: string, data: Uint8Array): Promise<void> {
// Use "state" as the key for persisted data
await this.#ctx.kv.put(["rivetkit", "data"], data);
}
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/drivers/rivet/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from "./log";
import { RivetManagerDriver } from "./manager-driver";
import { type RivetClientConfig, getRivetClientConfig } from "./rivet-client";
import { type RivetHandler, deserializeKeyFromTag } from "./util";
import * as cbor from "cbor-x";

export function createActorHandler(
registry: Registry<any>,
Expand Down Expand Up @@ -131,7 +132,9 @@ async function startActor(
// TODO: This needs to assert this has only been called once
// Initialize with data
router.post("/initialize", async (c) => {
const body = await c.req.json();
const bodyBlob = await c.req.blob();
const bodyBytes = await bodyBlob.bytes();
const body = cbor.decode(bodyBytes);

logger().debug("received initialize request", {
hasInput: !!body.input,
Expand All @@ -150,7 +153,7 @@ async function startActor(
// Finish initialization
initializedPromise.resolve(undefined);

return c.json({}, 200);
return c.body(cbor.encode({}), 200);
});

// Start server
Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/drivers/rivet/manager-driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
rivetRequest,
} from "./rivet-client";
import { convertKeyToRivetTags } from "./util";
import * as cbor from "cbor-x";

export interface ActorState {
key: string[];
Expand Down Expand Up @@ -157,9 +158,9 @@ export class RivetManagerDriver implements ManagerDriver {
const res = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Content-Type": "application/cbor",
},
body: JSON.stringify({ input }),
body: cbor.encode({ input }),
});
if (!res.ok) {
throw new InternalError(
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/inline-client-driver/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import type { WebSocket } from "ws";
import { FakeEventSource } from "./fake-event-source";
import { FakeWebSocket } from "./fake-websocket";
import { logger } from "./log";
import * as cbor from "cbor-x";

/**
* Client driver that calls the manager driver inline.
Expand Down
8 changes: 2 additions & 6 deletions packages/core/src/test/driver/actor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@ export class TestActorDriver implements ActorDriver {
};
}

async readInput(actorId: string): Promise<unknown | undefined> {
return this.#state.readInput(actorId);
}

async readPersistedData(actorId: string): Promise<unknown | undefined> {
async readPersistedData(actorId: string): Promise<Uint8Array | undefined> {
return this.#state.readPersistedData(actorId);
}

async writePersistedData(actorId: string, data: unknown): Promise<void> {
async writePersistedData(actorId: string, data: Uint8Array): Promise<void> {
this.#state.writePersistedData(actorId, data);
}

Expand Down
Loading
Loading