diff --git a/packages/actor-core/src/client/client.ts b/packages/actor-core/src/client/client.ts index 4dbf59b21..9c1c692a7 100644 --- a/packages/actor-core/src/client/client.ts +++ b/packages/actor-core/src/client/client.ts @@ -38,18 +38,18 @@ export interface ActorAccessor { * * @template A The actor class that this handle is connected to. * @param {Omit} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - get(opts?: Omit): Promise>; + get(opts?: Omit): ActorHandle; /** * Creates a new actor with the name automatically injected from the property accessor. * * @template A The actor class that this handle is connected to. * @param {Omit} opts - Options for creating the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - create(opts: Omit): Promise>; + create(opts: Omit): ActorHandle; /** * Gets an actor by its ID. @@ -57,9 +57,9 @@ export interface ActorAccessor { * @template A The actor class that this handle is connected to. * @param {string} actorId - The ID of the actor. * @param {GetWithIdOptions} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - getWithId(actorId: string, opts?: GetWithIdOptions): Promise>; + getWithId(actorId: string, opts?: GetWithIdOptions): ActorHandle; } /** @@ -147,19 +147,19 @@ export interface ActorAccessor { * The actor name is automatically injected from the property accessor. * * @template A The actor class that this handle is connected to. - * @param {Omit} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @param {GetOptions} [opts] - Options for getting the actor. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - get(opts?: GetOptions): Promise>; + get(opts?: GetOptions): ActorHandle; /** * Creates a new actor with the name automatically injected from the property accessor. * * @template A The actor class that this handle is connected to. - * @param {Omit} opts - Options for creating the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @param {CreateOptions} opts - Options for creating the actor. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - create(opts: CreateOptions): Promise>; + create(opts: CreateOptions): ActorHandle; /** * Gets an actor by its ID. @@ -167,9 +167,9 @@ export interface ActorAccessor { * @template A The actor class that this handle is connected to. * @param {string} actorId - The ID of the actor. * @param {GetWithIdOptions} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - getWithId(actorId: string, opts?: GetWithIdOptions): Promise>; + getWithId(actorId: string, opts?: GetWithIdOptions): ActorHandle; } /** @@ -233,34 +233,39 @@ export class ClientRaw { * @template AD The actor class that this handle is connected to. * @param {string} actorId - The ID of the actor. * @param {GetWithIdOptions} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. */ - async getWithId( + getWithId( actorId: string, opts?: GetWithIdOptions, - ): Promise> { + ): ActorHandle { logger().debug("get actor with id ", { actorId, params: opts?.params, }); - const resJson = await this.#sendManagerRequest< - ActorsRequest, - ActorsResponse - >("POST", "/manager/actors", { - query: { - getForId: { - actorId, - }, - }, - }); - - const handle = await this.#createHandle( - resJson.endpoint, - opts?.params, - resJson.supportedTransports, + // Create a proxy immediately, without waiting for connection + const handle = this.#createLazyHandle( + async () => { + const resJson = await this.#sendManagerRequest< + ActorsRequest, + ActorsResponse + >("POST", "/manager/actors", { + query: { + getForId: { + actorId, + }, + }, + }); + return { + endpoint: resJson.endpoint, + params: opts?.params, + supportedTransports: resJson.supportedTransports, + }; + } ); - return this.#createProxy(handle) as ActorHandle; + + return handle; } /** @@ -268,12 +273,15 @@ export class ClientRaw { * * @example * ``` - * const room = await client.get({ + * const room = client.get({ * name: 'chat_room', * // Get or create the actor for the channel `random` * channel: 'random' * }); * + * // Explicitly connect if needed (or call methods directly and they'll throw if connection fails) + * await room.connect(); + * * // This actor will have the tags: { name: 'chat_room', channel: 'random' } * await room.sendMessage('Hello, world!'); * ``` @@ -281,13 +289,13 @@ export class ClientRaw { * @template AD The actor class that this handle is connected to. * @param {ActorTags} tags - The tags to identify the actor. * @param {GetOptions} [opts] - Options for getting the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. * @see {@link https://rivet.gg/docs/manage#client.get} */ - async get( + get( name: string, opts?: GetOptions, - ): Promise> { + ): ActorHandle { let tags = opts?.tags ?? {}; // Build create config @@ -308,25 +316,30 @@ export class ClientRaw { create, }); - const resJson = await this.#sendManagerRequest< - ActorsRequest, - ActorsResponse - >("POST", "/manager/actors", { - query: { - getOrCreateForTags: { - name, - tags, - create, - }, - }, - }); - - const handle = await this.#createHandle( - resJson.endpoint, - opts?.params, - resJson.supportedTransports, + // Create a proxy immediately, without waiting for connection + const handle = this.#createLazyHandle( + async () => { + const resJson = await this.#sendManagerRequest< + ActorsRequest, + ActorsResponse + >("POST", "/manager/actors", { + query: { + getOrCreateForTags: { + name, + tags, + create, + }, + }, + }); + return { + endpoint: resJson.endpoint, + params: opts?.params, + supportedTransports: resJson.supportedTransports, + }; + } ); - return this.#createProxy(handle) as ActorHandle; + + return handle; } /** @@ -335,7 +348,7 @@ export class ClientRaw { * @example * ``` * // Create a new document actor - * const doc = await client.create({ + * const doc = client.create({ * create: { * tags: { * name: 'my_document', @@ -343,49 +356,54 @@ export class ClientRaw { * } * } * }); + * + * // Explicitly connect if needed + * await doc.connect(); * * await doc.doSomething(); * ``` * * @template AD The actor class that this handle is connected to. * @param {CreateOptions} opts - Options for creating the actor. - * @returns {Promise>} - A promise resolving to the actor handle. + * @returns {ActorHandle} - The actor handle, which will connect in the background. * @see {@link https://rivet.gg/docs/manage#client.create} */ - async create( + create( name: string, opts: CreateOptions, - ): Promise> { + ): ActorHandle { // Build create config const create = { name, ...opts.create, }; - // Default to the chosen region - //if (!create.region) create.region = (await this.#regionPromise)?.id; - logger().debug("create actor", { name, parameters: opts?.params, create, }); - const resJson = await this.#sendManagerRequest< - ActorsRequest, - ActorsResponse - >("POST", "/manager/actors", { - query: { - create, - }, - }); - - const handle = await this.#createHandle( - resJson.endpoint, - opts?.params, - resJson.supportedTransports, + // Create a proxy immediately, without waiting for connection + const handle = this.#createLazyHandle( + async () => { + const resJson = await this.#sendManagerRequest< + ActorsRequest, + ActorsResponse + >("POST", "/manager/actors", { + query: { + create, + }, + }); + return { + endpoint: resJson.endpoint, + params: opts?.params, + supportedTransports: resJson.supportedTransports, + }; + } ); - return this.#createProxy(handle) as ActorHandle; + + return handle; } async #createHandle( @@ -409,6 +427,103 @@ export class ClientRaw { return handle; } + /** + * Creates a lazy handle that will create a real handle when needed. + * This is used to implement non-async get/create/getWithId methods. + * + * @template AD The actor definition type + * @param {() => Promise<{endpoint: string, params: unknown, supportedTransports: Transport[]}>} fetchActorInfo Function to fetch actor info when needed + * @returns {ActorHandle} A handle that will lazily connect when used + */ + #createLazyHandle( + fetchActorInfo: () => Promise<{ + endpoint: string; + params: unknown; + supportedTransports: Transport[]; + }>, + ): ActorHandle { + // Set up the lazy connection with default values + let realHandle: ActorHandleRaw | undefined; + let fetchPromise: Promise | undefined; + + // Function to get or create the real handle + const getOrCreateRealHandle = async (): Promise => { + // Return existing handle if we already have one + if (realHandle) return realHandle; + + // Return existing promise if we're already fetching + if (fetchPromise) return fetchPromise; + + // Create a new promise to fetch the actor info + fetchPromise = (async () => { + try { + const imports = await this.#dynamicImportsPromise; + const actorInfo = await fetchActorInfo(); + + // Create the real handle + realHandle = new ActorHandleRaw( + this, + actorInfo.endpoint, + actorInfo.params, + this.#encodingKind, + this.#supportedTransports, + actorInfo.supportedTransports, + imports, + ); + + // Register and connect the handle + this[ACTOR_HANDLES_SYMBOL].add(realHandle); + realHandle[CONNECT_SYMBOL](); + + return realHandle; + } catch (error) { + // Clear the promise so we can try again + fetchPromise = undefined; + throw error; + } + })(); + + return fetchPromise; + }; + + // Create a proxy that appears to be an ActorHandle but lazily instantiates the real one + const lazyHandle = new Proxy({} as ActorHandleRaw, { + get(target, prop, receiver) { + // Special case for connect() method + if (prop === 'connect') { + return async () => { + const handle = await getOrCreateRealHandle(); + return handle.connect(); + }; + } + + // For Symbol.toStringTag and other built-in symbols + if (typeof prop === 'symbol') { + // Handle the case where the real handle exists + if (realHandle) { + return Reflect.get(realHandle, prop, receiver); + } + // Otherwise return something sensible for the symbol + return undefined; + } + + // For normal properties and methods + return async (...args: unknown[]) => { + const handle = await getOrCreateRealHandle(); + const method = Reflect.get(handle, prop, receiver); + + if (typeof method === 'function') { + return method.apply(handle, args); + } else { + return method; + } + }; + }, + }); + + return this.#createProxy(lazyHandle) as ActorHandle; + } + #createProxy( handle: ActorHandleRaw, ): ActorHandle { @@ -643,12 +758,12 @@ export function createClient>( return { get: ( opts?: GetOptions, - ): Promise[typeof prop]>> => { + ): ActorHandle[typeof prop]> => { return target.get[typeof prop]>(prop, opts); }, create: ( opts: CreateOptions, - ): Promise[typeof prop]>> => { + ): ActorHandle[typeof prop]> => { return target.create[typeof prop]>( prop, opts, @@ -657,7 +772,7 @@ export function createClient>( getWithId: ( actorId: string, opts?: GetWithIdOptions, - ): Promise[typeof prop]>> => { + ): ActorHandle[typeof prop]> => { return target.getWithId[typeof prop]>( actorId, opts, @@ -669,4 +784,4 @@ export function createClient>( return undefined; }, }) as Client; -} +} \ No newline at end of file diff --git a/packages/actor-core/src/client/handle.ts b/packages/actor-core/src/client/handle.ts index 122e6b150..82b807022 100644 --- a/packages/actor-core/src/client/handle.ts +++ b/packages/actor-core/src/client/handle.ts @@ -76,6 +76,12 @@ export class ActorHandleRaw { /** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */ #onOpenPromise?: PromiseWithResolvers; + /** Promise that resolves when the actor is connected */ + #connectPromise?: Promise; + + /** Error that occurred during connection, if any */ + #connectError?: Error; + // TODO: ws message queue /** @@ -108,6 +114,7 @@ export class ActorHandleRaw { * @param {string} name - The name of the RPC function to call. * @param {...Args} args - The arguments to pass to the RPC function. * @returns {Promise} - A promise that resolves to the response of the RPC function. + * @throws {Error} - If the actor is not connected and connect() has not been called or failed. */ async action = unknown[], Response = unknown>( name: string, @@ -115,7 +122,32 @@ export class ActorHandleRaw { ): Promise { logger().debug("action", { name, args }); - // TODO: Add to queue if socket is not open + // Check if we have a connection error + if (this.#connectError) { + throw new errors.ActionError("ACTOR_NOT_AVAILABLE", "Actor is not available", { + cause: this.#connectError, + }); + } + + // Wait for connection to complete if connect() was called + if (this.#connectPromise) { + try { + await this.#connectPromise; + } catch (error) { + this.#connectError = error instanceof Error ? error : new Error(String(error)); + throw new errors.ActionError("ACTOR_NOT_AVAILABLE", "Actor is not available", { + cause: this.#connectError, + }); + } + } + + // Ensure we have a transport + if (!this.#transport) { + throw new errors.ActionError( + "ACTOR_NOT_CONNECTED", + "Actor is not connected, call connect() first", + ); + } const rpcId = this.#rpcIdCounter; this.#rpcIdCounter += 1; @@ -134,8 +166,6 @@ export class ActorHandleRaw { }, } satisfies wsToServer.ToServer); - // TODO: Throw error if disconnect is called - const { i: responseId, o: output } = await promise; if (responseId !== rpcId) throw new Error( @@ -165,18 +195,93 @@ export class ActorHandleRaw { /** * Do not call this directly. -enc - * Establishes a connection to the server using the specified endpoint & encoding & driver. + * + * Lazily establishes a connection to the server in the background. * * @protected */ public [CONNECT_SYMBOL]() { + // Start connecting in the background this.#connectWithRetry(); } + /** + * Explicitly connect to the actor. + * + * @returns {Promise} - A promise that resolves when the connection is established. + * @throws {Error} - If the connection fails. + */ + public async connect(): Promise { + // Return existing promise if we're already connecting + if (this.#connectPromise) { + return this.#connectPromise; + } + + // Start connecting if not already + if (!this.#connecting) { + this.#connectWithRetry(); + } + + // Create a promise that will resolve when the connection is established + const { promise, resolve, reject } = Promise.withResolvers(); + this.#connectPromise = promise; + + // Check if we have a transport already (rare but possible) + if (this.#transport) { + resolve(); + return promise; + } + + // Setup an event listener to wait for the connection + const cleanup = () => { + this.#abortController.signal.removeEventListener("abort", onAbort); + }; + + const onAbort = () => { + cleanup(); + reject(new Error("Connection aborted")); + }; + + // Listen for abort events + this.#abortController.signal.addEventListener("abort", onAbort); + + // Start a timer to check for connection + const checkConnection = () => { + // If disposed, reject + if (this.#disposed) { + cleanup(); + reject(new Error("Handle disposed")); + return; + } + + // If connected, resolve + if (this.#transport) { + cleanup(); + resolve(); + return; + } + + // If there was an error connecting, reject + if (this.#connectError) { + cleanup(); + reject(this.#connectError); + return; + } + + // Otherwise, continue checking + setTimeout(checkConnection, 100); + }; + + checkConnection(); + return promise; + } + async #connectWithRetry() { this.#connecting = true; + // Reset connect error + this.#connectError = undefined; + // Attempt to reconnect indefinitely try { await pRetry(this.#connectAndWait.bind(this), { @@ -185,6 +290,9 @@ enc maxTimeout: 30_000, onFailedAttempt: (error) => { + // Store the latest error + this.#connectError = error; + logger().warn("failed to reconnect", { attempt: error.attemptNumber, error: stringifyError(error), @@ -194,14 +302,19 @@ enc // Cancel retry if aborted signal: this.#abortController.signal, }); + + // We successfully connected, clear any previous error + this.#connectError = undefined; } catch (err) { if ((err as Error).name === "AbortError") { // Ignore abortions logger().info("connection retry aborted"); - return; } else { - // Unknown error - throw err; + // Store the error + this.#connectError = err instanceof Error ? err : new Error(String(err)); + logger().error("connection failed permanently", { + error: stringifyError(this.#connectError), + }); } }