diff --git a/docs/concepts/schedule.mdx b/docs/concepts/schedule.mdx index b5a5fa96a..d2fa47885 100644 --- a/docs/concepts/schedule.mdx +++ b/docs/concepts/schedule.mdx @@ -21,6 +21,9 @@ Parameters: - `fn` (string): The name of the action to be executed. - `...args` (unknown[]): Additional arguments to pass to the function. +Returns: +- `Promise`: A unique identifier for the scheduled event. + ### `c.schedule.at(timestamp, fn, ...args)` Schedules a function to be executed at a specific timestamp. This function persists across actor restarts, upgrades, or crashes. @@ -31,6 +34,41 @@ Parameters: - `fn` (string): The name of the action to be executed. - `...args` (unknown[]): Additional arguments to pass to the function. +Returns: +- `Promise`: A unique identifier for the scheduled event. + +### `c.schedule.list()` + +Lists all scheduled events for the actor. + +Returns: +- `Promise`: An array of scheduled alarms, where each alarm has the following properties: + - `id` (string): The unique identifier of the alarm + - `createdAt` (number): The timestamp when the alarm was created + - `triggersAt` (number): The timestamp when the alarm will trigger + - `fn` (string): The name of the action to be executed + - `args` (unknown[]): The arguments to pass to the function + +### `c.schedule.get(alarmId)` + +Gets details about a specific scheduled event. + +Parameters: +- `alarmId` (string): The unique identifier of the alarm to retrieve. + +Returns: +- `Promise`: The alarm details if found, undefined otherwise. + +### `c.schedule.cancel(alarmId)` + +Cancels a scheduled event. + +Parameters: +- `alarmId` (string): The unique identifier of the alarm to cancel. + +Returns: +- `Promise` + ## Scheduling Private Actions Currently, scheduling can only trigger public actions. If the scheduled action is private, it needs to be secured with something like a token. @@ -46,7 +84,7 @@ const reminderService = actor({ }, actions: { - setReminder: (c, userId, message, delayMs) => { + setReminder: async (c, userId, message, delayMs) => { const reminderId = crypto.randomUUID(); // Store the reminder in state @@ -57,7 +95,89 @@ const reminderService = actor({ }; // Schedule the sendReminder action to run after the delay - c.after(delayMs, "sendReminder", reminderId); + // Store the alarmId for potential cancellation + const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId); + + return { reminderId, alarmId }; + }, + + cancelReminder: async (c, reminderId) => { + const reminder = c.state.reminders[reminderId]; + if (!reminder) return { success: false }; + + // Cancel the scheduled reminder + await c.schedule.cancel(reminder.alarmId); + + // Clean up the reminder + delete c.state.reminders[reminderId]; + + return { success: true }; + }, + + sendReminder: (c, reminderId) => { + const reminder = c.state.reminders[reminderId]; + if (!reminder) return; + + // Find the user's connection if they're online + const userConn = c.conns.find( + conn => conn.state.userId === reminder.userId + ); + + if (userConn) { + // Send the reminder to the user + userConn.send("reminder", { + message: reminder.message, + scheduledAt: reminder.scheduledFor + }); + } else { + // If user is offline, store reminder for later delivery + // ... + } + + // Clean up the processed reminder + delete c.state.reminders[reminderId]; + } + } +}); +``` + +## Testing Schedules + +```typescript +import { actor } from "actor-core"; + +const reminderService = actor({ + state: { + reminders: {} + }, + + actions: { + setReminder: async (c, userId, message, delayMs) => { + const reminderId = crypto.randomUUID(); + + // Store the reminder in state + c.state.reminders[reminderId] = { + userId, + message, + scheduledFor: Date.now() + delayMs + }; + + // Schedule the sendReminder action to run after the delay + // Store the alarmId for potential cancellation + const alarmId = await c.schedule.after(delayMs, "sendReminder", reminderId); + + return { reminderId, alarmId }; + }, + + cancelReminder: async (c, reminderId) => { + const reminder = c.state.reminders[reminderId]; + if (!reminder) return { success: false }; + + // Cancel the scheduled reminder + await c.schedule.cancel(reminder.alarmId); + + // Clean up the reminder + delete c.state.reminders[reminderId]; return { reminderId }; }, diff --git a/packages/actor-core/src/actor/driver.ts b/packages/actor-core/src/actor/driver.ts index 238d5112f..1635e0f57 100644 --- a/packages/actor-core/src/actor/driver.ts +++ b/packages/actor-core/src/actor/driver.ts @@ -14,6 +14,8 @@ export interface ActorDriver { // Schedule setAlarm(actor: AnyActorInstance, timestamp: number): Promise; + getAlarm(actor: AnyActorInstance): Promise; + deleteAlarm(actor: AnyActorInstance): Promise; // TODO: //destroy(): Promise; diff --git a/packages/actor-core/src/actor/instance.ts b/packages/actor-core/src/actor/instance.ts index 803e541f3..536920cd0 100644 --- a/packages/actor-core/src/actor/instance.ts +++ b/packages/actor-core/src/actor/instance.ts @@ -213,7 +213,7 @@ export class ActorInstance { timestamp: number, fn: string, args: unknown[], - ): Promise { + ): Promise { // Build event const eventId = crypto.randomUUID(); const newEvent: PersistedScheduleEvents = { @@ -244,6 +244,30 @@ export class ActorInstance { this.actorContext.log.info("setting alarm", { timestamp }); await this.#actorDriver.setAlarm(this, newEvent.t); } + return eventId; + } + + async getEvent(eventId: string) { + return this.#persist.e.find((x) => x.e === eventId); + } + + async cancelEvent(eventId: string) { + const index = this.#persist.e.findIndex((x) => x.e === eventId); + if (index !== -1) { + if (index === 0 && this.#persist.e.length === 1) { + this.actorContext.log.info("clearing alarm"); + await this.#actorDriver.deleteAlarm(this); + } else if (index === 0) { + this.actorContext.log.info("setting next alarm", { timestamp: this.#persist.e[1].t }); + await this.#actorDriver.setAlarm(this, this.#persist.e[1].t); + } + this.#persist.e.splice(index, 1); + this.actorContext.log.info("cancelled event", { eventId }); + } + } + + async listEvents() { + return this.#persist.e; } async onAlarm() { diff --git a/packages/actor-core/src/actor/schedule.ts b/packages/actor-core/src/actor/schedule.ts index 8948e8ccf..b621ef773 100644 --- a/packages/actor-core/src/actor/schedule.ts +++ b/packages/actor-core/src/actor/schedule.ts @@ -1,5 +1,13 @@ import type { AnyActorInstance } from "./instance"; +export interface ScheduledEvent { + id: string; + createdAt: number; + triggersAt: number; + fn: string; + args: unknown[]; +} + export class Schedule { #actor: AnyActorInstance; @@ -7,11 +15,23 @@ export class Schedule { this.#actor = actor; } - async after(duration: number, fn: string, ...args: unknown[]) { - await this.#actor.scheduleEvent(Date.now() + duration, fn, args); + async after(duration: number, fn: string, ...args: unknown[]): Promise { + return await this.#actor.scheduleEvent(Date.now() + duration, fn, args); } async at(timestamp: number, fn: string, ...args: unknown[]) { - await this.#actor.scheduleEvent(timestamp, fn, args); + return await this.#actor.scheduleEvent(timestamp, fn, args); + } + + async get(alarmId: string) { + return this.#actor.getEvent(alarmId); + } + + async cancel(eventId: string) { + await this.#actor.cancelEvent(eventId); + } + + async list() { + return await this.#actor.listEvents(); } } diff --git a/packages/actor-core/src/test/driver/actor.ts b/packages/actor-core/src/test/driver/actor.ts index 4840e9f81..3a53a9229 100644 --- a/packages/actor-core/src/test/driver/actor.ts +++ b/packages/actor-core/src/test/driver/actor.ts @@ -8,9 +8,11 @@ export interface ActorDriverContext { export class TestActorDriver implements ActorDriver { #state: TestGlobalState; + #alarms: Map; constructor(state: TestGlobalState) { this.#state = state; + this.#alarms = new Map(); } getContext(_actorId: string): ActorDriverContext { @@ -29,8 +31,23 @@ export class TestActorDriver implements ActorDriver { async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { const delay = Math.max(timestamp - Date.now(), 0); - setTimeout(() => { + const timeout = setTimeout(() => { + this.#alarms.delete(actor.id); actor.onAlarm(); }, delay); + this.#alarms.set(actor.id, { timeout, timestamp }); + } + + async getAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + return alarm ? alarm.timestamp : null; + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + if (alarm) { + clearTimeout(alarm.timeout); + this.#alarms.delete(actor.id); + } } } diff --git a/packages/drivers/file-system/src/actor.ts b/packages/drivers/file-system/src/actor.ts index 27d7871b5..404c39c74 100644 --- a/packages/drivers/file-system/src/actor.ts +++ b/packages/drivers/file-system/src/actor.ts @@ -8,9 +8,11 @@ export type ActorDriverContext = Record; */ export class FileSystemActorDriver implements ActorDriver { #state: FileSystemGlobalState; + #alarms: Map; constructor(state: FileSystemGlobalState) { this.#state = state; + this.#alarms = new Map(); } /** @@ -36,9 +38,24 @@ export class FileSystemActorDriver implements ActorDriver { } async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { - const delay = Math.max(0, timestamp - Date.now()); - setTimeout(() => { + const delay = Math.max(timestamp - Date.now(), 0); + const timeout = setTimeout(() => { + this.#alarms.delete(actor.id); actor.onAlarm(); }, delay); + this.#alarms.set(actor.id, { timeout, timestamp }); } -} \ No newline at end of file + + async getAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + return alarm ? alarm.timestamp : null; + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + if (alarm) { + clearTimeout(alarm.timeout); + this.#alarms.delete(actor.id); + } + } +} diff --git a/packages/drivers/memory/src/actor.ts b/packages/drivers/memory/src/actor.ts index 2f8f2659f..48389bcb4 100644 --- a/packages/drivers/memory/src/actor.ts +++ b/packages/drivers/memory/src/actor.ts @@ -5,9 +5,11 @@ export type ActorDriverContext = Record; export class MemoryActorDriver implements ActorDriver { #state: MemoryGlobalState; + #alarms: Map; constructor(state: MemoryGlobalState) { this.#state = state; + this.#alarms = new Map(); } getContext(_actorId: string): ActorDriverContext { @@ -24,8 +26,23 @@ export class MemoryActorDriver implements ActorDriver { async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { const delay = Math.max(timestamp - Date.now(), 0); - setTimeout(() => { + const timeout = setTimeout(() => { + this.#alarms.delete(actor.id); actor.onAlarm(); }, delay); + this.#alarms.set(actor.id, { timeout, timestamp }); + } + + async getAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + return alarm ? alarm.timestamp : null; + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + if (alarm) { + clearTimeout(alarm.timeout); + this.#alarms.delete(actor.id); + } } } diff --git a/packages/drivers/redis/src/actor.ts b/packages/drivers/redis/src/actor.ts index f99e0e06a..45b423c98 100644 --- a/packages/drivers/redis/src/actor.ts +++ b/packages/drivers/redis/src/actor.ts @@ -8,9 +8,11 @@ export interface ActorDriverContext { export class RedisActorDriver implements ActorDriver { #redis: Redis; + #alarms: Map; constructor(redis: Redis) { this.#redis = redis; + this.#alarms = new Map(); } getContext(_actorId: string): ActorDriverContext { @@ -32,8 +34,23 @@ export class RedisActorDriver implements ActorDriver { async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { const delay = Math.max(timestamp - Date.now(), 0); - setTimeout(() => { + const timeout = setTimeout(() => { + this.#alarms.delete(actor.id); actor.onAlarm(); }, delay); + this.#alarms.set(actor.id, { timeout, timestamp }); + } + + async getAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + return alarm ? alarm.timestamp : null; + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + if (alarm) { + clearTimeout(alarm.timeout); + this.#alarms.delete(actor.id); + } } } diff --git a/packages/platforms/cloudflare-workers/src/actor_driver.ts b/packages/platforms/cloudflare-workers/src/actor_driver.ts index 215df712b..d64de5b8d 100644 --- a/packages/platforms/cloudflare-workers/src/actor_driver.ts +++ b/packages/platforms/cloudflare-workers/src/actor_driver.ts @@ -61,4 +61,12 @@ export class CloudflareWorkersActorDriver implements ActorDriver { async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { await this.#getDOCtx(actor.id).storage.setAlarm(timestamp); } + + async getAlarm(actor: AnyActorInstance): Promise { + return await this.#getDOCtx(actor.id).storage.getAlarm(); + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + await this.#getDOCtx(actor.id).storage.deleteAlarm(); + } } diff --git a/packages/platforms/rivet/src/actor_driver.ts b/packages/platforms/rivet/src/actor_driver.ts index 19d02ae55..e7e1a6e27 100644 --- a/packages/platforms/rivet/src/actor_driver.ts +++ b/packages/platforms/rivet/src/actor_driver.ts @@ -7,9 +7,11 @@ export interface ActorDriverContext { export class RivetActorDriver implements ActorDriver { #ctx: ActorContext; + #alarms: Map; constructor(ctx: ActorContext) { this.#ctx = ctx; + this.#alarms = new Map(); } getContext(_actorId: string): ActorDriverContext { @@ -27,9 +29,24 @@ export class RivetActorDriver implements ActorDriver { } async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { - const timeout = Math.max(0, timestamp - Date.now()); - setTimeout(() => { + const delay = Math.max(timestamp - Date.now(), 0); + const timeout = setTimeout(() => { + this.#alarms.delete(actor.id); actor.onAlarm(); - }, timeout); + }, delay); + this.#alarms.set(actor.id, { timeout, timestamp }); + } + + async getAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + return alarm ? alarm.timestamp : null; + } + + async deleteAlarm(actor: AnyActorInstance): Promise { + const alarm = this.#alarms.get(actor.id); + if (alarm) { + clearTimeout(alarm.timeout); + this.#alarms.delete(actor.id); + } } }