-
Notifications
You must be signed in to change notification settings - Fork 44
feat(core): add abort controller & implement waitUntil #1200
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,6 +42,8 @@ export interface SaveStateOptions { | |
| * Forces the state to be saved immediately. This function will return when the state has saved successfully. | ||
| */ | ||
| immediate?: boolean; | ||
| /** Bypass ready check for stopping. */ | ||
| allowStoppingState?: boolean; | ||
| } | ||
|
|
||
| /** Actor type alias with all `any` types. Used for `extends` in classes referencing this actor. */ | ||
|
|
@@ -158,6 +160,7 @@ export class ActorInstance< | |
| #vars?: V; | ||
|
|
||
| #backgroundPromises: Promise<void>[] = []; | ||
| #abortController = new AbortController(); | ||
| #config: ActorConfig<S, CP, CS, V, I, AD, DB>; | ||
| #connectionDrivers!: ConnectionDriversMap; | ||
| #actorDriver!: ActorDriver; | ||
|
|
@@ -757,7 +760,7 @@ export class ActorInstance< | |
| const connIdx = this.#persist.c.findIndex((c) => c.i === conn.id); | ||
| if (connIdx !== -1) { | ||
| this.#persist.c.splice(connIdx, 1); | ||
| this.saveState({ immediate: true }); | ||
| this.saveState({ immediate: true, allowStoppingState: true }); | ||
| } else { | ||
| logger().warn("could not find persisted connection to remove", { | ||
| connId: conn.id, | ||
|
|
@@ -1048,8 +1051,12 @@ export class ActorInstance< | |
| } | ||
| } | ||
|
|
||
| #assertReady() { | ||
| #assertReady(allowStoppingState: boolean = false) { | ||
| if (!this.#ready) throw new errors.InternalError("Actor not ready"); | ||
| if (!allowStoppingState && this.#sleepCalled) | ||
| throw new errors.InternalError("Actor is going to sleep"); | ||
| if (!allowStoppingState && this.#stopCalled) | ||
| throw new errors.InternalError("Actor is stopping"); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1443,24 +1450,24 @@ export class ActorInstance< | |
| } | ||
|
|
||
| /** | ||
| * Runs a promise in the background. | ||
| * Prevents the actor from sleeping until promise is complete. | ||
| * | ||
| * This allows the actor runtime to ensure that a promise completes while | ||
| * returning from an action request early. | ||
| * | ||
| * @param promise - The promise to run in the background. | ||
| */ | ||
| _runInBackground(promise: Promise<void>) { | ||
| _waitUntil(promise: Promise<void>) { | ||
| this.#assertReady(); | ||
|
|
||
| // TODO: Should we force save the state? | ||
| // Add logging to promise and make it non-failable | ||
| const nonfailablePromise = promise | ||
| .then(() => { | ||
| logger().debug("background promise complete"); | ||
| logger().debug("wait until promise complete"); | ||
| }) | ||
| .catch((error) => { | ||
| logger().error("background promise failed", { | ||
| logger().error("wait until promise failed", { | ||
| error: stringifyError(error), | ||
| }); | ||
| }); | ||
|
|
@@ -1476,7 +1483,7 @@ export class ActorInstance< | |
| * @param opts - Options for saving the state. | ||
| */ | ||
| async saveState(opts: SaveStateOptions) { | ||
| this.#assertReady(); | ||
| this.#assertReady(opts.allowStoppingState); | ||
|
|
||
| if (this.#persistChanged) { | ||
| if (opts.immediate) { | ||
|
|
@@ -1552,7 +1559,7 @@ export class ActorInstance< | |
| return true; | ||
| } | ||
|
|
||
| /** Puts an actor to sleep. */ | ||
| /** Puts an actor to sleep. This should just start the sleep sequence, most shutdown logic should be in _stop (which is called by the ActorDriver when sleeping). */ | ||
| async _sleep() { | ||
| invariant(this.#sleepingSupported, "sleeping not supported"); | ||
| invariant(this.#actorDriver.sleep, "no sleep on driver"); | ||
|
|
@@ -1581,6 +1588,11 @@ export class ActorInstance< | |
|
|
||
| logger().info("actor stopping"); | ||
|
|
||
| // Abort any listeners waiting for shutdown | ||
| try { | ||
| this.#abortController.abort(); | ||
| } catch {} | ||
|
|
||
| // Call onStop lifecycle hook if defined | ||
| if (this.#config.onStop) { | ||
| try { | ||
|
|
@@ -1601,8 +1613,11 @@ export class ActorInstance< | |
| } | ||
| } | ||
|
|
||
| // Wait for any background tasks to finish, with timeout | ||
| await this.#waitBackgroundPromises(this.#config.options.waitUntilTimeout); | ||
|
|
||
| // Write state | ||
| await this.saveState({ immediate: true }); | ||
| await this.saveState({ immediate: true, allowStoppingState: true }); | ||
|
|
||
| // Disconnect existing connections | ||
| const promises: Promise<unknown>[] = []; | ||
|
|
@@ -1625,8 +1640,39 @@ export class ActorInstance< | |
| "timed out waiting for connections to close, shutting down anyway", | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| /** Abort signal that fires when the actor is stopping. */ | ||
| get abortSignal(): AbortSignal { | ||
| return this.#abortController.signal; | ||
| } | ||
|
|
||
| /** Wait for background waitUntil promises with a timeout. */ | ||
| async #waitBackgroundPromises(timeoutMs: number) { | ||
| const pending = this.#backgroundPromises; | ||
| if (pending.length === 0) { | ||
| logger().debug("no background promises"); | ||
| return; | ||
| } | ||
|
|
||
| // Race promises with timeout to determine if pending promises settled fast enough | ||
| const timedOut = await Promise.race([ | ||
| Promise.allSettled(pending).then(() => false), | ||
| new Promise<true>((resolve) => | ||
| setTimeout(() => resolve(true), timeoutMs), | ||
| ), | ||
| ]); | ||
|
|
||
| // TODO: | ||
| //Deno.exit(0); | ||
| if (timedOut) { | ||
| logger().error( | ||
| "timed out waiting for background tasks, background promises may have leaked", | ||
| { | ||
| count: pending.length, | ||
| timeoutMs, | ||
| }, | ||
| ); | ||
| } else { | ||
| logger().debug("background promises finished"); | ||
| } | ||
| } | ||
|
Comment on lines
+1651
to
1677
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential issues with
const timeoutId = setTimeout(() => resolve(true), timeoutMs);
Promise.allSettled(pending).then(() => {
clearTimeout(timeoutId);
resolve(false);
});
Consider implementing a promise tracking system that removes completed promises and properly handles the race condition during shutdown. Spotted by Diamond |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
abort()method is wrapped in an empty try-catch block that suppresses all errors without logging. WhileAbortController.abort()rarely throws, silent error handling can hide unexpected issues during the critical shutdown sequence. Consider either:abort()is not expected to throw in your implementationThis would improve debuggability if unexpected behavior occurs during shutdown.
Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.