diff --git a/CLAUDE.md b/CLAUDE.md index b06e464ab..55bf4790d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -25,6 +25,7 @@ - **Build:** `yarn build` - Production build using Turbopack - **Build specific package:** `yarn build -F actor-core` - Build only specified package - **Format:** `yarn fmt` - Format code with Biome +- Do not run the format command automatically. ## Core Concepts diff --git a/packages/actor-core/src/actor/protocol/http/resolve.ts b/packages/actor-core/src/actor/protocol/http/resolve.ts new file mode 100644 index 000000000..54c124539 --- /dev/null +++ b/packages/actor-core/src/actor/protocol/http/resolve.ts @@ -0,0 +1,8 @@ +import { z } from "zod"; + +export const ResolveResponseSchema = z.object({ + // Actor ID + i: z.string(), +}); + +export type ResolveResponse = z.infer; diff --git a/packages/actor-core/src/client/actor_common.ts b/packages/actor-core/src/client/actor_common.ts index 6fe7ad47c..613cab85a 100644 --- a/packages/actor-core/src/client/actor_common.ts +++ b/packages/actor-core/src/client/actor_common.ts @@ -1,4 +1,10 @@ import type { AnyActorDefinition, ActorDefinition } from "@/actor/definition"; +import type * as protoHttpResolve from "@/actor/protocol/http/resolve"; +import type { Encoding } from "@/actor/protocol/serde"; +import type { ActorQuery } from "@/manager/protocol/query"; +import { logger } from "./log"; +import * as errors from "./errors"; +import { sendHttpRequest } from "./utils"; /** * RPC function returned by Actor connections and handles. @@ -20,10 +26,55 @@ export type ActorRPCFunction< * Maps RPC methods from actor definition to typed function signatures. */ export type ActorDefinitionRpcs = - AD extends ActorDefinition ? { - [K in keyof R]: R[K] extends ( - ...args: infer Args - ) => infer Return - ? ActorRPCFunction - : never; - } : never; \ No newline at end of file + AD extends ActorDefinition + ? { + [K in keyof R]: R[K] extends (...args: infer Args) => infer Return + ? ActorRPCFunction + : never; + } + : never; + +/** + * Resolves an actor ID from a query by making a request to the /actors/resolve endpoint + * + * @param {string} endpoint - The manager endpoint URL + * @param {ActorQuery} actorQuery - The query to resolve + * @param {Encoding} encodingKind - The encoding to use (json or cbor) + * @returns {Promise} - A promise that resolves to the actor's ID + */ +export async function resolveActorId( + endpoint: string, + actorQuery: ActorQuery, + encodingKind: Encoding, +): Promise { + logger().debug("resolving actor ID", { query: actorQuery }); + + // Construct the URL using the current actor query + const queryParam = encodeURIComponent(JSON.stringify(actorQuery)); + const url = `${endpoint}/actors/resolve?encoding=${encodingKind}&query=${queryParam}`; + + // Use the shared HTTP request utility with integrated serialization + try { + const result = await sendHttpRequest< + Record, + protoHttpResolve.ResolveResponse + >({ + url, + method: "POST", + body: {}, + encoding: encodingKind, + }); + + logger().debug("resolved actor ID", { actorId: result.i }); + return result.i; + } catch (error) { + logger().error("failed to resolve actor ID", { error }); + if (error instanceof errors.ActorError) { + throw error; + } else { + throw new errors.InternalError( + `Failed to resolve actor ID: ${String(error)}`, + ); + } + } +} diff --git a/packages/actor-core/src/client/actor_conn.ts b/packages/actor-core/src/client/actor_conn.ts index 88e5ad6f4..1bbf3b677 100644 --- a/packages/actor-core/src/client/actor_conn.ts +++ b/packages/actor-core/src/client/actor_conn.ts @@ -1,20 +1,20 @@ +import type { AnyActorDefinition } from "@/actor/definition"; import type { Transport } from "@/actor/protocol/message/mod"; -import type { Encoding } from "@/actor/protocol/serde"; import type * as wsToClient from "@/actor/protocol/message/to-client"; import type * as wsToServer from "@/actor/protocol/message/to-server"; +import type { Encoding } from "@/actor/protocol/serde"; +import { importEventSource } from "@/common/eventsource"; import { MAX_CONN_PARAMS_SIZE } from "@/common/network"; import { assertUnreachable, stringifyError } from "@/common/utils"; +import { importWebSocket } from "@/common/websocket"; +import type { ActorQuery } from "@/manager/protocol/query"; import * as cbor from "cbor-x"; +import pRetry from "p-retry"; +import type { ActorDefinitionRpcs as ActorDefinitionRpcsImport } from "./actor_common"; +import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client"; import * as errors from "./errors"; import { logger } from "./log"; import { type WebSocketMessage as ConnMessage, messageLength } from "./utils"; -import { ACTOR_CONNS_SYMBOL, TRANSPORT_SYMBOL, type ClientRaw } from "./client"; -import type { AnyActorDefinition } from "@/actor/definition"; -import pRetry from "p-retry"; -import { importWebSocket } from "@/common/websocket"; -import { importEventSource } from "@/common/eventsource"; -import type { ActorQuery } from "@/manager/protocol/query"; -import { ActorDefinitionRpcs as ActorDefinitionRpcsImport } from "./actor_common"; // Re-export the type with the original name to maintain compatibility type ActorDefinitionRpcs = @@ -679,7 +679,7 @@ enc // Get the manager endpoint from the endpoint provided const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery)); - let url = `${this.endpoint}/actors/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}&query=${actorQueryStr}`; + const url = `${this.endpoint}/actors/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}&query=${actorQueryStr}`; // TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently. // TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests @@ -845,4 +845,4 @@ enc */ export type ActorConn = ActorConnRaw & - ActorDefinitionRpcs; + ActorDefinitionRpcs; \ No newline at end of file diff --git a/packages/actor-core/src/client/actor_handle.ts b/packages/actor-core/src/client/actor_handle.ts index 09e1c9918..f252c1992 100644 --- a/packages/actor-core/src/client/actor_handle.ts +++ b/packages/actor-core/src/client/actor_handle.ts @@ -1,12 +1,14 @@ -import type { Encoding } from "@/actor/protocol/serde"; -import { logger } from "./log"; -import { sendHttpRequest } from "./utils"; import type { AnyActorDefinition } from "@/actor/definition"; -import type { ActorQuery } from "@/manager/protocol/query"; -import type { ActorDefinitionRpcs } from "./actor_common"; import type { RpcRequest, RpcResponse } from "@/actor/protocol/http/rpc"; +import type { Encoding } from "@/actor/protocol/serde"; +import type { ActorQuery } from "@/manager/protocol/query"; +import { type ActorDefinitionRpcs, resolveActorId } from "./actor_common"; import { type ActorConn, ActorConnRaw } from "./actor_conn"; import { CREATE_ACTOR_CONN_PROXY, type ClientRaw } from "./client"; +import { logger } from "./log"; +import { sendHttpRequest } from "./utils"; +import invariant from "invariant"; +import { assertUnreachable } from "@/actor/utils"; /** * Provides underlying functions for stateless {@link ActorHandle} for RPC calls. @@ -111,6 +113,34 @@ export class ActorHandleRaw { conn, ) as ActorConn; } + + /** + * Resolves the actor to get its unique actor ID + * + * @returns {Promise} - A promise that resolves to the actor's ID + */ + async resolve(): Promise { + if ( + "getForKey" in this.#actorQuery || + "getOrCreateForKey" in this.#actorQuery + ) { + const actorId = await resolveActorId( + this.#endpoint, + this.#actorQuery, + this.#encodingKind, + ); + this.#actorQuery = { getForId: { actorId } }; + return actorId; + } else if ("getForId" in this.#actorQuery) { + // SKip since it's already resolved + return this.#actorQuery.getForId.actorId; + } else if ("create" in this.#actorQuery) { + // Cannot create a handle with this query + invariant(false, "actorQuery cannot be create"); + } else { + assertUnreachable(this.#actorQuery); + } + } } /** @@ -135,4 +165,6 @@ export type ActorHandle = Omit< > & { // Add typed version of ActorConn (instead of using AnyActorDefinition) connect(): ActorConn; + // Resolve method returns the actor ID + resolve(): Promise; } & ActorDefinitionRpcs; diff --git a/packages/actor-core/src/client/client.ts b/packages/actor-core/src/client/client.ts index cae6f908f..912cdaadf 100644 --- a/packages/actor-core/src/client/client.ts +++ b/packages/actor-core/src/client/client.ts @@ -4,7 +4,7 @@ import type { ActorQuery } from "@/manager/protocol/query"; import * as errors from "./errors"; import { ActorConn, ActorConnRaw, CONNECT_SYMBOL } from "./actor_conn"; import { ActorHandle, ActorHandleRaw } from "./actor_handle"; -import { ActorRPCFunction } from "./actor_common"; +import { ActorRPCFunction, resolveActorId } from "./actor_common"; import { logger } from "./log"; import type { ActorCoreApp } from "@/mod"; import type { AnyActorDefinition } from "@/actor/definition"; @@ -55,14 +55,17 @@ export interface ActorAccessor { /** * Creates a new actor with the name automatically injected from the property accessor, - * and returns a stateless handle to it. + * and returns a stateless handle to it with the actor ID resolved. * * @template AD The actor class that this handle is for. * @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings. * @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key). - * @returns {ActorHandle} - A handle to the actor. + * @returns {Promise>} - A promise that resolves to a handle to the actor. */ - create(key: string | string[], opts?: CreateOptions): ActorHandle; + create( + key: string | string[], + opts?: CreateOptions, + ): Promise>; } /** @@ -286,18 +289,19 @@ export class ClientRaw { /** * Creates a new actor with the provided key and returns a stateless handle to it. + * Resolves the actor ID and returns a handle with getForId query. * * @template AD The actor class that this handle is for. * @param {string} name - The name of the actor. * @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings. * @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key). - * @returns {ActorHandle} - A handle to the actor. + * @returns {Promise>} - A promise that resolves to a handle to the actor. */ - create( + async create( name: string, key: string | string[], opts: CreateOptions = {}, - ): ActorHandle { + ): Promise> { // Convert string to array of strings const keyArray: string[] = typeof key === "string" ? [key] : key; @@ -316,17 +320,36 @@ export class ClientRaw { create, }); - const actorQuery = { + // Create the actor + const createQuery = { create, - }; + } satisfies ActorQuery; + const actorId = await resolveActorId( + this.#managerEndpoint, + createQuery, + this.#encodingKind, + ); + logger().debug("created actor with ID", { + name, + key: keyArray, + actorId, + }); - const managerEndpoint = this.#managerEndpoint; + // Create handle with actor ID + const getForIdQuery = { + getForId: { + actorId, + }, + } satisfies ActorQuery; const handle = this.#createHandle( - managerEndpoint, + this.#managerEndpoint, opts?.params, - actorQuery, + getForIdQuery, ); - return createActorProxy(handle) as ActorHandle; + + const proxy = createActorProxy(handle) as ActorHandle; + + return proxy; } #createHandle( @@ -454,11 +477,11 @@ export function createClient>( opts, ); }, - create: ( + create: async ( key: string | string[], opts: CreateOptions = {}, - ): ActorHandle[typeof prop]> => { - return target.create[typeof prop]>( + ): Promise[typeof prop]>> => { + return await target.create[typeof prop]>( prop, key, opts, @@ -499,6 +522,9 @@ function createActorProxy( // Create RPC function that preserves 'this' context if (typeof prop === "string") { + // If JS is attempting to calling this as a promise, ignore it + if (prop === "then") return undefined; + let method = methodCache.get(prop); if (!method) { method = (...args: unknown[]) => target.action(prop, ...args); diff --git a/packages/actor-core/src/manager/router.ts b/packages/actor-core/src/manager/router.ts index beb91ceda..b1b5d7fc3 100644 --- a/packages/actor-core/src/manager/router.ts +++ b/packages/actor-core/src/manager/router.ts @@ -1,36 +1,37 @@ -import { Hono, Next, type Context as HonoContext } from "hono"; -import { cors } from "hono/cors"; -import { logger } from "./log"; +import * as errors from "@/actor/errors"; +import type * as protoHttpResolve from "@/actor/protocol/http/resolve"; +import type { ToClient } from "@/actor/protocol/message/to-client"; +import { type Encoding, serialize } from "@/actor/protocol/serde"; +import { + type ConnectionHandlers, + getRequestEncoding, + handleConnectionMessage, + handleRpc, + handleSseConnect, + handleWebSocketConnect, +} from "@/actor/router_endpoints"; +import { assertUnreachable } from "@/actor/utils"; +import type { AppConfig } from "@/app/config"; import { handleRouteError, handleRouteNotFound, loggerMiddleware, } from "@/common/router"; +import { deconstructError } from "@/common/utils"; import type { DriverConfig } from "@/driver-helpers/config"; -import type { AppConfig } from "@/app/config"; import { - createManagerInspectorRouter, type ManagerInspectorConnHandler, + createManagerInspectorRouter, } from "@/inspector/manager"; +import { Hono, type Context as HonoContext, type Next } from "hono"; +import { cors } from "hono/cors"; +import { streamSSE } from "hono/streaming"; +import type { WSContext } from "hono/ws"; +import invariant from "invariant"; +import type { ManagerDriver } from "./driver"; +import { logger } from "./log"; import { ConnectQuerySchema } from "./protocol/query"; -import * as errors from "@/actor/errors"; import type { ActorQuery } from "./protocol/query"; -import { assertUnreachable } from "@/actor/utils"; -import invariant from "invariant"; -import { - type ConnectionHandlers, - handleSseConnect, - handleRpc, - handleConnectionMessage, - getRequestEncoding, - handleWebSocketConnect, -} from "@/actor/router_endpoints"; -import { ManagerDriver } from "./driver"; -import { Encoding, serialize } from "@/actor/protocol/serde"; -import { deconstructError } from "@/common/utils"; -import { WSContext } from "hono/ws"; -import { ToClient } from "@/actor/protocol/message/to-client"; -import { streamSSE } from "hono/streaming"; type ProxyMode = | { @@ -105,6 +106,40 @@ export function createManagerRouter( return c.text("ok"); }); + // Resolve actor ID from query + app.post("/actors/resolve", async (c) => { + const encoding = getRequestEncoding(c.req); + logger().debug("resolve request encoding", { encoding }); + + // Get query parameters for actor lookup + const queryParam = c.req.query("query"); + if (!queryParam) { + logger().error("missing query parameter for resolve"); + throw new errors.MissingRequiredParameters(["query"]); + } + + // Parse the query JSON and validate with schema + let parsedQuery: ActorQuery; + try { + parsedQuery = JSON.parse(queryParam as string); + } catch (error) { + logger().error("invalid query json for resolve", { error }); + throw new errors.InvalidQueryJSON(error); + } + + // Get the actor ID and meta + const { actorId, meta } = await queryActor(c, parsedQuery, driver); + logger().debug("resolved actor", { actorId, meta }); + invariant(actorId, "Missing actor ID"); + + // Format response according to protocol + const response: protoHttpResolve.ResolveResponse = { + i: actorId, + }; + const serialized = serialize(response, encoding); + return c.body(serialized); + }); + app.get("/actors/connect/websocket", async (c) => { invariant(upgradeWebSocket, "WebSockets not supported"); diff --git a/packages/actor-core/tests/basic.test.ts b/packages/actor-core/tests/basic.test.ts index 95bd7fbdd..56c81de36 100644 --- a/packages/actor-core/tests/basic.test.ts +++ b/packages/actor-core/tests/basic.test.ts @@ -1,5 +1,5 @@ import { actor, setup } from "@/mod"; -import { test } from "vitest"; +import { test, expect } from "vitest"; import { setupTest } from "@/test/mod"; test("basic actor setup", async (c) => { @@ -23,3 +23,58 @@ test("basic actor setup", async (c) => { const counterInstance = client.counter.getOrCreate(); await counterInstance.increment(1); }); + +test("actorhandle.resolve resolves actor ID", async (c) => { + const testActor = actor({ + state: { value: "" }, + actions: { + getValue: (c) => c.state.value, + }, + }); + + const app = setup({ + actors: { testActor }, + }); + + const { client } = await setupTest(c, app); + + // Get a handle to the actor using a key + const handle = client.testActor.getOrCreate("test-key"); + + // Resolve should work without errors and return void + await handle.resolve(); + + // After resolving, we should be able to call an action + const value = await handle.getValue(); + expect(value).toBeDefined(); +}); + +test("client.create creates a new actor", async (c) => { + const testActor = actor({ + state: { createdVia: "" }, + actions: { + setCreationMethod: (c, method: string) => { + c.state.createdVia = method; + return c.state.createdVia; + }, + getCreationMethod: (c) => c.state.createdVia, + }, + }); + + const app = setup({ + actors: { testActor }, + }); + + const { client } = await setupTest(c, app); + + // Create a new actor using client.create + const handle = await client.testActor.create("created-actor"); + + // Set some state to confirm it works + const result = await handle.setCreationMethod("client.create"); + expect(result).toBe("client.create"); + + // Verify we can retrieve the state + const method = await handle.getCreationMethod(); + expect(method).toBe("client.create"); +});