Skip to content
This repository was archived by the owner on Oct 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- **Build:** `yarn build` - Production build using Turbopack
- **Build specific package:** `yarn build -F actor-core` - Build only specified package
- **Format:** `yarn fmt` - Format code with Biome
- Do not run the format command automatically.

## Core Concepts

Expand Down
8 changes: 8 additions & 0 deletions packages/actor-core/src/actor/protocol/http/resolve.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { z } from "zod";

export const ResolveResponseSchema = z.object({
// Actor ID
i: z.string(),
});

export type ResolveResponse = z.infer<typeof ResolveResponseSchema>;
65 changes: 58 additions & 7 deletions packages/actor-core/src/client/actor_common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import type { AnyActorDefinition, ActorDefinition } from "@/actor/definition";
import type * as protoHttpResolve from "@/actor/protocol/http/resolve";
import type { Encoding } from "@/actor/protocol/serde";
import type { ActorQuery } from "@/manager/protocol/query";
import { logger } from "./log";
import * as errors from "./errors";
import { sendHttpRequest } from "./utils";

/**
* RPC function returned by Actor connections and handles.
Expand All @@ -20,10 +26,55 @@ export type ActorRPCFunction<
* Maps RPC methods from actor definition to typed function signatures.
*/
export type ActorDefinitionRpcs<AD extends AnyActorDefinition> =
AD extends ActorDefinition<any, any, any, any, infer R> ? {
[K in keyof R]: R[K] extends (
...args: infer Args
) => infer Return
? ActorRPCFunction<Args, Return>
: never;
} : never;
AD extends ActorDefinition<any, any, any, any, infer R>
? {
[K in keyof R]: R[K] extends (...args: infer Args) => infer Return
? ActorRPCFunction<Args, Return>
: never;
}
: never;

/**
* Resolves an actor ID from a query by making a request to the /actors/resolve endpoint
*
* @param {string} endpoint - The manager endpoint URL
* @param {ActorQuery} actorQuery - The query to resolve
* @param {Encoding} encodingKind - The encoding to use (json or cbor)
* @returns {Promise<string>} - A promise that resolves to the actor's ID
*/
export async function resolveActorId(
endpoint: string,
actorQuery: ActorQuery,
encodingKind: Encoding,
): Promise<string> {
logger().debug("resolving actor ID", { query: actorQuery });

// Construct the URL using the current actor query
const queryParam = encodeURIComponent(JSON.stringify(actorQuery));
const url = `${endpoint}/actors/resolve?encoding=${encodingKind}&query=${queryParam}`;

// Use the shared HTTP request utility with integrated serialization
try {
const result = await sendHttpRequest<
Record<never, never>,
protoHttpResolve.ResolveResponse
>({
url,
method: "POST",
body: {},
encoding: encodingKind,
});

logger().debug("resolved actor ID", { actorId: result.i });
return result.i;
} catch (error) {
logger().error("failed to resolve actor ID", { error });
if (error instanceof errors.ActorError) {
throw error;
} else {
throw new errors.InternalError(
`Failed to resolve actor ID: ${String(error)}`,
);
}
}
}
20 changes: 10 additions & 10 deletions packages/actor-core/src/client/actor_conn.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import type { AnyActorDefinition } from "@/actor/definition";
import type { Transport } from "@/actor/protocol/message/mod";
import type { Encoding } from "@/actor/protocol/serde";
import type * as wsToClient from "@/actor/protocol/message/to-client";
import type * as wsToServer from "@/actor/protocol/message/to-server";
import type { Encoding } from "@/actor/protocol/serde";
import { importEventSource } from "@/common/eventsource";
import { MAX_CONN_PARAMS_SIZE } from "@/common/network";
import { assertUnreachable, stringifyError } from "@/common/utils";
import { importWebSocket } from "@/common/websocket";
import type { ActorQuery } from "@/manager/protocol/query";
import * as cbor from "cbor-x";
import pRetry from "p-retry";
import type { ActorDefinitionRpcs as ActorDefinitionRpcsImport } from "./actor_common";
import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client";
import * as errors from "./errors";
import { logger } from "./log";
import { type WebSocketMessage as ConnMessage, messageLength } from "./utils";
import { ACTOR_CONNS_SYMBOL, TRANSPORT_SYMBOL, type ClientRaw } from "./client";
import type { AnyActorDefinition } from "@/actor/definition";
import pRetry from "p-retry";
import { importWebSocket } from "@/common/websocket";
import { importEventSource } from "@/common/eventsource";
import type { ActorQuery } from "@/manager/protocol/query";
import { ActorDefinitionRpcs as ActorDefinitionRpcsImport } from "./actor_common";

// Re-export the type with the original name to maintain compatibility
type ActorDefinitionRpcs<AD extends AnyActorDefinition> =
Expand Down Expand Up @@ -679,7 +679,7 @@ enc
// Get the manager endpoint from the endpoint provided
const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery));

let url = `${this.endpoint}/actors/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}&query=${actorQueryStr}`;
const url = `${this.endpoint}/actors/connections/${this.#connectionId}/message?encoding=${this.encodingKind}&connectionToken=${encodeURIComponent(this.#connectionToken)}&query=${actorQueryStr}`;

// TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently.
// TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests
Expand Down Expand Up @@ -845,4 +845,4 @@ enc
*/

export type ActorConn<AD extends AnyActorDefinition> = ActorConnRaw &
ActorDefinitionRpcs<AD>;
ActorDefinitionRpcs<AD>;
42 changes: 37 additions & 5 deletions packages/actor-core/src/client/actor_handle.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import type { Encoding } from "@/actor/protocol/serde";
import { logger } from "./log";
import { sendHttpRequest } from "./utils";
import type { AnyActorDefinition } from "@/actor/definition";
import type { ActorQuery } from "@/manager/protocol/query";
import type { ActorDefinitionRpcs } from "./actor_common";
import type { RpcRequest, RpcResponse } from "@/actor/protocol/http/rpc";
import type { Encoding } from "@/actor/protocol/serde";
import type { ActorQuery } from "@/manager/protocol/query";
import { type ActorDefinitionRpcs, resolveActorId } from "./actor_common";
import { type ActorConn, ActorConnRaw } from "./actor_conn";
import { CREATE_ACTOR_CONN_PROXY, type ClientRaw } from "./client";
import { logger } from "./log";
import { sendHttpRequest } from "./utils";
import invariant from "invariant";
import { assertUnreachable } from "@/actor/utils";

/**
* Provides underlying functions for stateless {@link ActorHandle} for RPC calls.
Expand Down Expand Up @@ -111,6 +113,34 @@ export class ActorHandleRaw {
conn,
) as ActorConn<AnyActorDefinition>;
}

/**
* Resolves the actor to get its unique actor ID
*
* @returns {Promise<string>} - A promise that resolves to the actor's ID
*/
async resolve(): Promise<string> {
if (
"getForKey" in this.#actorQuery ||
"getOrCreateForKey" in this.#actorQuery
) {
const actorId = await resolveActorId(
this.#endpoint,
this.#actorQuery,
this.#encodingKind,
);
this.#actorQuery = { getForId: { actorId } };
return actorId;
} else if ("getForId" in this.#actorQuery) {
// SKip since it's already resolved
return this.#actorQuery.getForId.actorId;
} else if ("create" in this.#actorQuery) {
// Cannot create a handle with this query
invariant(false, "actorQuery cannot be create");
} else {
assertUnreachable(this.#actorQuery);
}
}
}

/**
Expand All @@ -135,4 +165,6 @@ export type ActorHandle<AD extends AnyActorDefinition> = Omit<
> & {
// Add typed version of ActorConn (instead of using AnyActorDefinition)
connect(): ActorConn<AD>;
// Resolve method returns the actor ID
resolve(): Promise<string>;
} & ActorDefinitionRpcs<AD>;
58 changes: 42 additions & 16 deletions packages/actor-core/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { ActorQuery } from "@/manager/protocol/query";
import * as errors from "./errors";
import { ActorConn, ActorConnRaw, CONNECT_SYMBOL } from "./actor_conn";
import { ActorHandle, ActorHandleRaw } from "./actor_handle";
import { ActorRPCFunction } from "./actor_common";
import { ActorRPCFunction, resolveActorId } from "./actor_common";
import { logger } from "./log";
import type { ActorCoreApp } from "@/mod";
import type { AnyActorDefinition } from "@/actor/definition";
Expand Down Expand Up @@ -55,14 +55,17 @@ export interface ActorAccessor<AD extends AnyActorDefinition> {

/**
* Creates a new actor with the name automatically injected from the property accessor,
* and returns a stateless handle to it.
* and returns a stateless handle to it with the actor ID resolved.
*
* @template AD The actor class that this handle is for.
* @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings.
* @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key).
* @returns {ActorHandle<AD>} - A handle to the actor.
* @returns {Promise<ActorHandle<AD>>} - A promise that resolves to a handle to the actor.
*/
create(key: string | string[], opts?: CreateOptions): ActorHandle<AD>;
create(
key: string | string[],
opts?: CreateOptions,
): Promise<ActorHandle<AD>>;
}

/**
Expand Down Expand Up @@ -286,18 +289,19 @@ export class ClientRaw {

/**
* Creates a new actor with the provided key and returns a stateless handle to it.
* Resolves the actor ID and returns a handle with getForId query.
*
* @template AD The actor class that this handle is for.
* @param {string} name - The name of the actor.
* @param {string | string[]} key - The key to identify the actor. Can be a single string or an array of strings.
* @param {CreateOptions} [opts] - Options for creating the actor (excluding name and key).
* @returns {ActorHandle<AD>} - A handle to the actor.
* @returns {Promise<ActorHandle<AD>>} - A promise that resolves to a handle to the actor.
*/
create<AD extends AnyActorDefinition>(
async create<AD extends AnyActorDefinition>(
name: string,
key: string | string[],
opts: CreateOptions = {},
): ActorHandle<AD> {
): Promise<ActorHandle<AD>> {
// Convert string to array of strings
const keyArray: string[] = typeof key === "string" ? [key] : key;

Expand All @@ -316,17 +320,36 @@ export class ClientRaw {
create,
});

const actorQuery = {
// Create the actor
const createQuery = {
create,
};
} satisfies ActorQuery;
const actorId = await resolveActorId(
this.#managerEndpoint,
createQuery,
this.#encodingKind,
);
logger().debug("created actor with ID", {
name,
key: keyArray,
actorId,
});

const managerEndpoint = this.#managerEndpoint;
// Create handle with actor ID
const getForIdQuery = {
getForId: {
actorId,
},
} satisfies ActorQuery;
const handle = this.#createHandle(
managerEndpoint,
this.#managerEndpoint,
opts?.params,
actorQuery,
getForIdQuery,
);
return createActorProxy(handle) as ActorHandle<AD>;

const proxy = createActorProxy(handle) as ActorHandle<AD>;

return proxy;
}

#createHandle(
Expand Down Expand Up @@ -454,11 +477,11 @@ export function createClient<A extends ActorCoreApp<any>>(
opts,
);
},
create: (
create: async (
key: string | string[],
opts: CreateOptions = {},
): ActorHandle<ExtractActorsFromApp<A>[typeof prop]> => {
return target.create<ExtractActorsFromApp<A>[typeof prop]>(
): Promise<ActorHandle<ExtractActorsFromApp<A>[typeof prop]>> => {
return await target.create<ExtractActorsFromApp<A>[typeof prop]>(
prop,
key,
opts,
Expand Down Expand Up @@ -499,6 +522,9 @@ function createActorProxy<AD extends AnyActorDefinition>(

// Create RPC function that preserves 'this' context
if (typeof prop === "string") {
// If JS is attempting to calling this as a promise, ignore it
if (prop === "then") return undefined;

let method = methodCache.get(prop);
if (!method) {
method = (...args: unknown[]) => target.action(prop, ...args);
Expand Down
Loading
Loading