diff --git a/.gitattributes b/.gitattributes index 40017413f..a6b094fb6 100644 --- a/.gitattributes +++ b/.gitattributes @@ -7,4 +7,5 @@ *.png binary *.jpg binary +*.tgz binary diff --git a/NEW_SPEC.md b/NEW_SPEC.md deleted file mode 100644 index 4dbee0847..000000000 --- a/NEW_SPEC.md +++ /dev/null @@ -1,177 +0,0 @@ -# Routing Refactor Spec - -## Prerequisites - -Make breaking changes as needed. Do not worry about backwards compatibility. - -## Current design - -Currently, there are 3 topologies. Requests through each topology look like (specifically for actions): - -Standalone: -- Manager router -> Auth & resolve actor ID -> Inline routing handler -> StandaloneTopology.getOrCreateActor() -> Direct method call on local actor instance -- Inline client -> Direct connection to local actor instance -> Executes action synchronously in same process - -Coordinated: -- Manager router -> Auth & resolve actor ID -> Inline routing handler -> CoordinateTopology.publishActionToLeader() -> Message sent via coordinate driver -> Leader peer executes action -> Response flows back -- Inline client -> Routes to local topology -> Forwards to leader peer via message passing -> Waits for acknowledgment with timeout - -Partition: -- Manager router -> Auth & resolve actor ID -> Custom routing handler -> Proxy request to partition URL -> Partition's Actor Router -> Executes on isolated actor instance -- Inline client -> Not used (partition uses custom routing handler that proxies to remote actor router) - -## Desired design - -We need to standardize all requests to have a standard HTTP interface to communicate with the actor. - -Manager router: -- HTTP request arrives -> Auth & resolve actor ID -> Create standard Request object -> ManagerDriver.onFetch(actorId, request) -> Driver-specific routing (local/remote/proxy) -> Actor Router handles request -> Execute action on actor instance -> Return Response - -Inline client: -- Client calls action/rpc -> Create standard Request object -> ActorDriver.sendRequest(actorId, request) -> Driver routes to actor (in-process/IPC/network) -> Actor Router handles request -> Execute on actor instance -> Return response to client - -See the existing routingHandler.custom for reference on code that's similar. - -## New Project Structure - -Completely remove topologies and usage of topologies. Completely remove uses of topology classes and interfaces. -Remove all uses of: - -- Topologies - - Move all topology-related functionality in to manager driver - - Any topology-specific settings are now part of the driver - - The ManagerDriver now handles actor lifecycle -- ConnRoutingHandler and ConnectionHandlers - - Everything behaves like ConnRoutingHandlerCustom now, by calling methods on ManagerDriver -- ConnRoutingHandlerCustom - - sendRequest, openWebSocket, proxyRequest, and proxyWebSocket are now part of ManagerDriver - -Add onFetch and onWebSocket to ManagerDriver. - -All configurations should be part of ActorDriver and ManagerDriver. - -Ensure that the following drivers are working: - -- Memory -- File system -- Cloudflare Workers - -Ignore: - -- Redis (types will not pass) - -Delete: - -- Rivet driver (currently commented out) - -Move all code for the coordinated driver to a separate package. Ignore this package for now, this will have compile errors, etc. - - -### Current Structure to Remove -``` -packages/core/src/ -├── topologies/ -│ ├── standalone/ -│ │ └── topology.ts -│ ├── partition/ -│ │ ├── topology.ts (split into manager/actor) -│ │ └── actor.ts -│ └── coordinate/ -│ ├── topology.ts -│ ├── driver.ts -│ ├── node/ -│ └── peer/ -├── actor/ -│ ├── conn-routing-handler.ts (remove) -│ └── router-endpoints.ts (ConnectionHandlers interface - remove) -└── manager/ - └── topology.ts (remove base topology) -``` - -### New Structure -``` -packages/core/src/ -├── actor/ -│ ├── driver.ts (enhanced with routing capabilities) -│ │ └── Add: sendRequest(actorId, request) method -│ ├── instance.ts (keep, minor updates) -│ ├── router.ts (new - handles actor-side request routing) -│ ├── config.ts (merge topology configs here) -│ └── errors.ts (keep) -├── manager/ -│ ├── driver.ts (enhanced with lifecycle + routing) -│ │ └── Add: onFetch(actorId, request) -│ │ └── Add: onWebSocket(actorId, request, socket) -│ │ └── Add: sendRequest(actorId, request) -│ │ └── Add: openWebSocket(actorId, request) -│ │ └── Add: proxyRequest(actorId, request) -│ │ └── Add: proxyWebSocket(actorId, request, socket) -│ │ └── Add: actor lifecycle methods from topologies -│ ├── router.ts (simplified - delegates to driver) -│ └── config.ts (merge topology configs here) -├── client/ -│ ├── http-client-driver.ts (update to use new routing) -│ └── inline-client-driver.ts (update to use ActorDriver.sendRequest) -├── common/ -│ └── request-response.ts (new - standard Request/Response interfaces) -└── driver-test-suite/ (update tests for new architecture) - -packages/drivers/memory/src/ -├── manager-driver.ts (implement new routing methods) -└── actor-driver.ts (implement sendRequest) - -packages/drivers/file-system/src/ -├── manager-driver.ts (implement new routing methods) -└── actor-driver.ts (implement sendRequest) - -packages/platforms/cloudflare-workers/src/ -├── manager-driver.ts (implement new routing methods) -└── actor-driver.ts (implement sendRequest) - -packages/coordinate/ (new package - move from core) -├── src/ -│ ├── driver.ts -│ ├── manager-driver.ts (implements ManagerDriver) -│ ├── actor-driver.ts (implements ActorDriver) -│ ├── node/ -│ ├── peer/ -│ └── mod.ts -└── package.json -``` - -### Additional Symbols to Delete - -1. **Routing Handler Types** (in `actor/conn-routing-handler.ts`): - - `BuildProxyEndpoint` type - - `SendRequestHandler` type (duplicate in partition/topology.ts) - - `OpenWebSocketHandler` type (duplicate in partition/topology.ts) - - `ProxyRequestHandler` type - - `ProxyWebSocketHandler` type - -2. **Configuration Types and Schemas**: - - `Topology` enum type (in `registry/run-config.ts`) - - `TopologySchema` (in `registry/run-config.ts`) - - `topology` field in `DriverConfigSchema` - - `connRoutingHandler` property in `ManagerDriver` interface - -3. **Internal Types**: - - `GlobalState` interface (in `coordinate/topology.ts`) - -4. **Registry Module Logic**: - - All topology setup logic in `registry/mod.ts` (lines 61-78 and 114-125) - - Topology exports from `topologies/mod.ts` - -### Symbols to Move to Coordinate Package - -1. **CoordinateDriver and Related Types**: - - `CoordinateDriver` interface (in `topologies/coordinate/driver.ts`) - - `NodeMessageCallback` type - - `GetActorLeaderOutput` interface - - `StartActorAndAcquireLeaseOutput` interface - - `ExtendLeaseOutput` interface - - `AttemptAcquireLease` interface - -2. **Coordinate-specific Configuration**: - - `ActorPeerConfig` and `ActorPeerConfigSchema` (in `registry/run-config.ts`) - - `actorPeer` field in `RunConfigSchema` - - `coordinate` field in `DriverConfigSchema` diff --git a/NEW_SPEC2.md b/NEW_SPEC2.md deleted file mode 100644 index 86180de61..000000000 --- a/NEW_SPEC2.md +++ /dev/null @@ -1,8 +0,0 @@ - -Manager router -> ManagerDriver.createActor -> save actor in memory -Manager router -> ManagerDriver.proxyRequest -> actorRouter.fetch(req, { Bindings: ... }) -> get actor ID from env -> ActorDriver.loadActor -> call actor action - -## todo - -[ ] fix genericconnectionglobalstate - diff --git a/packages/core/package.json b/packages/core/package.json index e8d11aac5..5244f29b5 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -158,20 +158,21 @@ }, "dependencies": { "@hono/standard-validator": "^0.1.3", - "cbor-x": "^1.6.0", + "@hono/zod-openapi": "^0.19.10", "@rivetkit/fast-json-patch": "^3.1.2", + "cbor-x": "^1.6.0", + "hono": "^4.7.0", "invariant": "^2.2.4", "nanoevents": "^9.1.0", "on-change": "^5.0.1", "p-retry": "^6.2.1", - "zod": "^3.25.76", - "@hono/zod-openapi": "^0.19.10", - "hono": "^4.7.0" + "zod": "^3.25.76" }, "devDependencies": { - "@hono/node-server": "^1.14.0", + "@hono/node-server": "^1.18.2", "@hono/node-ws": "^1.1.1", "@rivet-gg/actor-core": "^25.1.0", + "@rivetkit/engine-runner": "https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472", "@types/invariant": "^2", "@types/node": "^22.13.1", "@types/ws": "^8", diff --git a/packages/core/src/actor/driver.ts b/packages/core/src/actor/driver.ts index 71a827e1f..4696da5c2 100644 --- a/packages/core/src/actor/driver.ts +++ b/packages/core/src/actor/driver.ts @@ -39,9 +39,7 @@ export interface ActorDriver { */ getDatabase(actorId: string): Promise; - // TODO: - //destroy(): Promise; - //readState(): void; + shutdown?(immediate: boolean): Promise; } export enum ConnectionReadyState { diff --git a/packages/core/src/actor/router-endpoints.ts b/packages/core/src/actor/router-endpoints.ts index 8ec60f648..166d643c0 100644 --- a/packages/core/src/actor/router-endpoints.ts +++ b/packages/core/src/actor/router-endpoints.ts @@ -101,7 +101,7 @@ export interface WebSocketOpts { * Creates a WebSocket connection handler */ export async function handleWebSocketConnect( - c: HonoContext | undefined, + req: Request | undefined, runConfig: RunConfig, actorDriver: ActorDriver, actorId: string, @@ -109,7 +109,7 @@ export async function handleWebSocketConnect( parameters: unknown, authData: unknown, ): Promise { - const exposeInternalError = c ? getRequestExposeInternalError(c.req) : false; + const exposeInternalError = req ? getRequestExposeInternalError(req) : false; // Setup promise for the init handlers since all other behavior depends on this const { @@ -157,7 +157,7 @@ export async function handleWebSocketConnect( try { const connId = generateConnId(); const connToken = generateConnToken(); - const connState = await actor.prepareConn(parameters, c?.req.raw); + const connState = await actor.prepareConn(parameters, req); // Save socket const connGlobalState = @@ -578,7 +578,7 @@ export async function handleConnectionMessage( } export async function handleRawWebSocketHandler( - c: HonoContext | undefined, + req: Request | undefined, path: string, actorDriver: ActorDriver, actorId: string, @@ -602,8 +602,8 @@ export async function handleRawWebSocketHandler( const normalizedPath = pathname + url.search; let newRequest: Request; - if (c) { - newRequest = new Request(`http://actor${normalizedPath}`, c.req.raw); + if (req) { + newRequest = new Request(`http://actor${normalizedPath}`, req); } else { newRequest = new Request(`http://actor${normalizedPath}`, { method: "GET", @@ -660,8 +660,8 @@ export function getRequestEncoding(req: HonoRequest): Encoding { return result.data; } -export function getRequestExposeInternalError(req: HonoRequest): boolean { - const param = req.header(HEADER_EXPOSE_INTERNAL_ERROR); +export function getRequestExposeInternalError(req: Request): boolean { + const param = req.headers.get(HEADER_EXPOSE_INTERNAL_ERROR); if (!param) { return false; } diff --git a/packages/core/src/actor/router.ts b/packages/core/src/actor/router.ts index 7255ce186..b356d50f3 100644 --- a/packages/core/src/actor/router.ts +++ b/packages/core/src/actor/router.ts @@ -91,7 +91,7 @@ export function createActorRouter( const authData = authDataRaw ? JSON.parse(authDataRaw) : undefined; return await handleWebSocketConnect( - c as HonoContext, + c.req.raw, runConfig, actorDriver, c.env.actorId, @@ -220,7 +220,7 @@ export function createActorRouter( }); return await handleRawWebSocketHandler( - c, + c.req.raw, pathWithQuery, actorDriver, c.env.actorId, diff --git a/packages/core/src/client/actor-handle.ts b/packages/core/src/client/actor-handle.ts index 88cd806c3..552614a87 100644 --- a/packages/core/src/client/actor-handle.ts +++ b/packages/core/src/client/actor-handle.ts @@ -149,6 +149,15 @@ export class ActorHandleRaw { "getOrCreateForKey" in this.#actorQuery ) { // TODO: + let name: string; + if ("getForKey" in this.#actorQuery) { + name = this.#actorQuery.getForKey.name; + } else if ("getOrCreateForKey" in this.#actorQuery) { + name = this.#actorQuery.getOrCreateForKey.name; + } else { + assertUnreachable(this.#actorQuery); + } + const actorId = await this.#driver.resolveActorId( undefined, this.#actorQuery, @@ -156,7 +165,9 @@ export class ActorHandleRaw { this.#params, signal ? { signal } : undefined, ); - this.#actorQuery = { getForId: { actorId } }; + + this.#actorQuery = { getForId: { actorId, name } }; + return actorId; } else if ("getForId" in this.#actorQuery) { // SKip since it's already resolved diff --git a/packages/core/src/client/client.ts b/packages/core/src/client/client.ts index c53643a21..38af7ae67 100644 --- a/packages/core/src/client/client.ts +++ b/packages/core/src/client/client.ts @@ -268,8 +268,9 @@ export class ClientRaw { params: opts?.params, }); - const actorQuery = { + const actorQuery: ActorQuery = { getForId: { + name, actorId, }, }; @@ -400,6 +401,7 @@ export class ClientRaw { // Create handle with actor ID const getForIdQuery = { getForId: { + name, actorId, }, } satisfies ActorQuery; diff --git a/packages/core/src/common/router.ts b/packages/core/src/common/router.ts index 615636b94..b53775228 100644 --- a/packages/core/src/common/router.ts +++ b/packages/core/src/common/router.ts @@ -47,7 +47,7 @@ export function handleRouteError( c: HonoContext, ) { const exposeInternalError = - opts.enableExposeInternalError && getRequestExposeInternalError(c.req); + opts.enableExposeInternalError && getRequestExposeInternalError(c.req.raw); const { statusCode, code, message, metadata } = deconstructError( error, diff --git a/packages/core/src/driver-test-suite/mod.ts b/packages/core/src/driver-test-suite/mod.ts index b7cfc501a..b0e1f0bac 100644 --- a/packages/core/src/driver-test-suite/mod.ts +++ b/packages/core/src/driver-test-suite/mod.ts @@ -157,7 +157,7 @@ export async function createTestRuntime( }); // Create router - const managerDriver = config.driver.manager(registry.config, config); + const managerDriver = driver.manager(registry.config, config); const inlineDriver = createInlineClientDriver(managerDriver); const { router } = createManagerRouter( registry.config, diff --git a/packages/core/src/driver-test-suite/tests/actor-conn-state.ts b/packages/core/src/driver-test-suite/tests/actor-conn-state.ts index 08bc1e3fc..5e0360626 100644 --- a/packages/core/src/driver-test-suite/tests/actor-conn-state.ts +++ b/packages/core/src/driver-test-suite/tests/actor-conn-state.ts @@ -94,6 +94,10 @@ export function runActorConnStateTests(driverTestConfig: DriverTestConfig) { const conn1 = handle.connect(); const conn2 = handle.connect(); + // HACK: Wait for both connections to successfully connect by waiting for a round trip RPC + await conn1.getConnectionState(); + await conn2.getConnectionState(); + // Get state1 for reference const state1 = await conn1.getConnectionState(); @@ -119,6 +123,10 @@ export function runActorConnStateTests(driverTestConfig: DriverTestConfig) { const conn1 = handle.connect(); const conn2 = handle.connect(); + // HACK: Wait for both connections to successfully connect by waiting for a round trip RPC + await conn1.getConnectionState(); + await conn2.getConnectionState(); + // Get all connection states const allStates = await conn1.getAllConnectionStates(); diff --git a/packages/core/src/driver-test-suite/tests/raw-http-request-properties.ts b/packages/core/src/driver-test-suite/tests/raw-http-request-properties.ts index 92d3f6e2c..ac4cc19e3 100644 --- a/packages/core/src/driver-test-suite/tests/raw-http-request-properties.ts +++ b/packages/core/src/driver-test-suite/tests/raw-http-request-properties.ts @@ -111,7 +111,6 @@ export function runRawHttpRequestPropertiesTests( "Accept-Language": "en-US,en;q=0.9", "Cache-Control": "no-cache", "User-Agent": "RivetKit-Test/1.0", - "X-Forwarded-For": "192.168.1.1", "X-Request-ID": "12345", }, }); @@ -125,7 +124,6 @@ export function runRawHttpRequestPropertiesTests( expect(data.headers["cache-control"]).toBe("no-cache"); // User-Agent might be overwritten by the HTTP client, so just check it exists expect(data.headers["user-agent"]).toBeTruthy(); - expect(data.headers["x-forwarded-for"]).toBe("192.168.1.1"); expect(data.headers["x-request-id"]).toBe("12345"); }); diff --git a/packages/core/src/driver-test-suite/tests/request-access.ts b/packages/core/src/driver-test-suite/tests/request-access.ts index 93787e179..63eecf5b8 100644 --- a/packages/core/src/driver-test-suite/tests/request-access.ts +++ b/packages/core/src/driver-test-suite/tests/request-access.ts @@ -11,7 +11,7 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { const handle = client.requestAccessActor.getOrCreate(["test-request"], { params: { trackRequest: true }, }); - const connection = await handle.connect(); + const connection = handle.connect(); // Get request info that was captured in onBeforeConnect const requestInfo = await connection.getRequestInfo(); @@ -30,14 +30,11 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { expect(requestInfo.createConnState.requestMethod).toBeDefined(); expect(requestInfo.createConnState.requestHeaders).toBeDefined(); } else { - // Inline client doesn't have request object - expect(requestInfo.onBeforeConnect.hasRequest).toBe(false); - expect(requestInfo.onBeforeConnect.requestUrl).toBeNull(); - expect(requestInfo.onBeforeConnect.requestMethod).toBeNull(); - - expect(requestInfo.createConnState.hasRequest).toBe(false); - expect(requestInfo.createConnState.requestUrl).toBeNull(); - expect(requestInfo.createConnState.requestMethod).toBeNull(); + // Inline client may or may not have request object depending on the driver + // + // e.g. + // - File system does not have a request for inline requests + // - Rivet Engine proxies the request so it has access to the request object } // Clean up @@ -54,7 +51,7 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { params: { trackRequest: false }, }, ); - const connection = await handle.connect(); + const connection = handle.connect(); // Get request info const requestInfo = await connection.getRequestInfo(); @@ -85,7 +82,7 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { const handle = client.requestAccessActor.getOrCreate(["test-headers"], { params: { trackRequest: true }, }); - const connection = await handle.connect(); + const connection = handle.connect(); // Get request info const requestInfo = await connection.getRequestInfo(); @@ -108,9 +105,9 @@ export function runRequestAccessTests(driverTestConfig: DriverTestConfig) { "object", ); } else { - // Inline client doesn't have request object - expect(requestInfo.onBeforeConnect.hasRequest).toBe(false); - expect(requestInfo.createConnState.hasRequest).toBe(false); + // Inline client may or may not have request object depending on the driver + // + // See "should have access to request object in onBeforeConnect and createConnState" } // Clean up diff --git a/packages/core/src/drivers/default.ts b/packages/core/src/drivers/default.ts index 143bc77a5..e75b4dacd 100644 --- a/packages/core/src/drivers/default.ts +++ b/packages/core/src/drivers/default.ts @@ -1,20 +1,31 @@ +import { UserError } from "@/actor/errors"; import { logger } from "@/actor/log"; +import { createEngineDriver } from "@/drivers/engine/mod"; import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod"; -import { type DriverConfig, UserError } from "@/mod"; +import type { DriverConfig, RunConfig } from "@/registry/run-config"; import { getEnvUniversal } from "@/utils"; /** - * Determines which driver to use if none is provided. + * Chooses the appropriate driver based on the run configuration. */ -export function createDefaultDriver(): DriverConfig { - const driver = getEnvUniversal("RIVETKIT_DRIVER"); - if (!driver || driver === "file-system") { - logger().debug("using default file system driver"); - return createFileSystemOrMemoryDriver(true); - } else if (driver === "memory") { - logger().debug("using default memory driver"); - return createFileSystemOrMemoryDriver(false); - } else { - throw new UserError(`Unrecognized driver: ${driver}`); +export function chooseDefaultDriver(runConfig: RunConfig): DriverConfig { + const engineEndpoint = runConfig.engine || getEnvUniversal("RIVET_ENGINE"); + + if (engineEndpoint && runConfig.driver) { + throw new UserError( + "Cannot specify both 'engine' and 'driver' in configuration", + ); + } + + if (runConfig.driver) { + return runConfig.driver; } + + if (engineEndpoint) { + logger().debug("using rivet engine driver", { endpoint: engineEndpoint }); + return createEngineDriver({ endpoint: engineEndpoint }); + } + + logger().debug("using default file system driver"); + return createFileSystemOrMemoryDriver(true); } diff --git a/packages/core/src/drivers/engine/actor-driver.ts b/packages/core/src/drivers/engine/actor-driver.ts new file mode 100644 index 000000000..46a28cfeb --- /dev/null +++ b/packages/core/src/drivers/engine/actor-driver.ts @@ -0,0 +1,360 @@ +import type { + ActorConfig as RunnerActorConfig, + RunnerConfig, +} from "@rivetkit/engine-runner"; +import { Runner } from "@rivetkit/engine-runner"; +import * as cbor from "cbor-x"; +import { WSContext, WSContextInit } from "hono/ws"; +import invariant from "invariant"; +import { ActionContext } from "@/actor/action"; +import { generateConnId, generateConnToken } from "@/actor/connection"; +import { + CONN_DRIVER_GENERIC_HTTP, + type GenericHttpDriverState, +} from "@/actor/generic-conn-driver"; +import * as protoHttpAction from "@/actor/protocol/http/action"; +import { deserialize, EncodingSchema, serialize } from "@/actor/protocol/serde"; +import { ActorHandle } from "@/client/actor-handle"; +import type { Client } from "@/client/client"; +import { + type ActorDriver, + type AnyActorInstance, + HEADER_AUTH_DATA, + HEADER_CONN_PARAMS, + HEADER_ENCODING, + type ManagerDriver, + serializeEmptyPersistData, +} from "@/driver-helpers/mod"; +import type { + ActorRouter, + AnyActorInstance as CoreAnyActorInstance, + RegistryConfig, + RunConfig, + UniversalWebSocket, +} from "@/mod"; +import { + createActorRouter, + createGenericConnDrivers, + GenericConnGlobalState, + handleRawWebSocketHandler, + handleWebSocketConnect, + lookupInRegistry, + noopNext, + PATH_CONNECT_WEBSOCKET, + PATH_RAW_WEBSOCKET_PREFIX, +} from "@/mod"; +import type { Config } from "./config"; +import { deserializeActorKey } from "./keys"; +import { KEYS } from "./kv"; +import { logger } from "./log"; + +interface ActorHandler { + actor?: AnyActorInstance; + actorStartPromise?: PromiseWithResolvers; + metadata: RunnerActorConfig["metadata"]; + genericConnGlobalState: GenericConnGlobalState; + persistedData?: Uint8Array; +} + +export type DriverContext = {}; + +export class EngineActorDriver implements ActorDriver { + #registryConfig: RegistryConfig; + #runConfig: RunConfig; + #managerDriver: ManagerDriver; + #inlineClient: Client; + #config: Config; + #runner: Runner; + #actors: Map = new Map(); + #actorRouter: ActorRouter; + #version: number = 1; // Version for the runner protocol + + constructor( + registryConfig: RegistryConfig, + runConfig: RunConfig, + managerDriver: ManagerDriver, + inlineClient: Client, + config: Config, + ) { + this.#registryConfig = registryConfig; + this.#runConfig = runConfig; + this.#managerDriver = managerDriver; + this.#inlineClient = inlineClient; + this.#config = config; + this.#actorRouter = createActorRouter(runConfig, this); + + // Create runner configuration + let hasDisconnected = false; + const runnerConfig: RunnerConfig = { + version: this.#version, + endpoint: config.endpoint, + pegboardEndpoint: config.pegboardEndpoint, + namespace: config.namespace, + addresses: config.addresses, + totalSlots: config.totalSlots, + runnerName: config.runnerName, + prepopulateActorNames: Object.keys(this.#registryConfig.use), + onConnected: () => { + if (hasDisconnected) { + logger().info("runner reconnected", { + namespace: this.#config.namespace, + runnerName: this.#config.runnerName, + }); + } else { + logger().debug("runner connected", { + namespace: this.#config.namespace, + runnerName: this.#config.runnerName, + }); + } + }, + onDisconnected: () => { + logger().warn("runner disconnected", { + namespace: this.#config.namespace, + runnerName: this.#config.runnerName, + }); + hasDisconnected = true; + }, + fetch: this.#runnerFetch.bind(this), + websocket: this.#runnerWebSocket.bind(this), + onActorStart: this.#runnerOnActorStart.bind(this), + onActorStop: this.#runnerOnActorStop.bind(this), + }; + + // Create and start runner + this.#runner = new Runner(runnerConfig); + this.#runner.start(); + logger().debug("engine runner started", { + endpoint: config.endpoint, + namespace: config.namespace, + runnerName: config.runnerName, + }); + } + + async #loadActorHandler(actorId: string): Promise { + // Check if actor is already loaded + const handler = this.#actors.get(actorId); + if (!handler) throw new Error(`Actor handler does not exist ${actorId}`); + if (handler.actorStartPromise) await handler.actorStartPromise.promise; + if (!handler.actor) throw new Error("Actor should be loaded"); + return handler; + } + + async loadActor(actorId: string): Promise { + const handler = await this.#loadActorHandler(actorId); + if (!handler.actor) throw new Error(`Actor ${actorId} failed to load`); + return handler.actor; + } + + getGenericConnGlobalState(actorId: string): GenericConnGlobalState { + const handler = this.#actors.get(actorId); + if (!handler) { + throw new Error(`Actor ${actorId} not loaded`); + } + return handler.genericConnGlobalState; + } + + getContext(actorId: string): DriverContext { + return {}; + } + + async readPersistedData(actorId: string): Promise { + const handler = this.#actors.get(actorId); + if (!handler) throw new Error(`Actor ${actorId} not loaded`); + if (handler.persistedData) return handler.persistedData; + + const [value] = await this.#runner.kvGet(actorId, [KEYS.PERSIST_DATA]); + + if (value !== null) { + handler.persistedData = value; + return value; + } else { + return undefined; + } + } + + async writePersistedData(actorId: string, data: Uint8Array): Promise { + const handler = this.#actors.get(actorId); + if (!handler) throw new Error(`Actor ${actorId} not loaded`); + + handler.persistedData = data; + + await this.#runner.kvPut(actorId, [[KEYS.PERSIST_DATA, data]]); + } + + async setAlarm(actor: AnyActorInstance, timestamp: number): Promise { + // TODO: Set timeout + // TODO: Use alarm on sleep + // TODO: Send alarm to runner + + throw new Error("Alarms not implemented for engine driver"); + } + + async getDatabase(_actorId: string): Promise { + return undefined; + } + + // Runner lifecycle callbacks + async #runnerOnActorStart( + actorId: string, + generation: number, + config: RunnerActorConfig, + ): Promise { + logger().debug("runner actor starting", { + actorId, + name: config.metadata.actor.name, + keys: config.metadata.actor.keys, + generation, + }); + + // Deserialize input + let input; + if (config.input) { + input = cbor.decode(config.input); + } + + // Get or create handler + let handler = this.#actors.get(actorId); + if (!handler) { + handler = { + genericConnGlobalState: new GenericConnGlobalState(), + actorStartPromise: Promise.withResolvers(), + metadata: config.metadata, + persistedData: serializeEmptyPersistData(input), + }; + this.#actors.set(actorId, handler); + } + + const name = config.metadata.actor.name as string; + const key = deserializeActorKey(config.metadata.actor.keys[0]); + + // Create actor instance + const definition = lookupInRegistry( + this.#registryConfig, + config.metadata.actor.name as string, // TODO: Remove cast + ); + handler.actor = definition.instantiate(); + + // Start actor + const connDrivers = createGenericConnDrivers( + handler.genericConnGlobalState, + ); + await handler.actor.start( + connDrivers, + this, + this.#inlineClient, + actorId, + name, + key, + "unknown", // TODO: Add regions + ); + + // Resolve promise if waiting + handler.actorStartPromise?.resolve(); + handler.actorStartPromise = undefined; + + logger().debug("runner actor started", { actorId, name, key }); + } + + async #runnerOnActorStop(actorId: string, generation: number): Promise { + logger().debug("runner actor stopping", { actorId, generation }); + + const handler = this.#actors.get(actorId); + if (handler?.actor) { + await handler.actor.stop(); + this.#actors.delete(actorId); + } + + logger().debug("runner actor stopped", { actorId }); + } + + async #runnerFetch(actorId: string, request: Request): Promise { + logger().debug("runner fetch", { + actorId, + url: request.url, + method: request.method, + }); + return await this.#actorRouter.fetch(request, { actorId }); + } + + async #runnerWebSocket( + actorId: string, + websocketRaw: any, + request: Request, + ): Promise { + const websocket = websocketRaw as UniversalWebSocket; + + logger().debug("runner websocket", { actorId, url: request.url }); + + const url = new URL(request.url); + + // Parse headers + const encodingRaw = request.headers.get(HEADER_ENCODING); + const connParamsRaw = request.headers.get(HEADER_CONN_PARAMS); + const authDataRaw = request.headers.get(HEADER_AUTH_DATA); + + const encoding = EncodingSchema.parse(encodingRaw); + const connParams = connParamsRaw ? JSON.parse(connParamsRaw) : undefined; + const authData = authDataRaw ? JSON.parse(authDataRaw) : undefined; + + // Fetch WS handler + // + // We store the promise since we need to add WebSocket event listeners immediately that will wait for the promise to resolve + let wsHandlerPromise; + if (url.pathname === PATH_CONNECT_WEBSOCKET) { + wsHandlerPromise = handleWebSocketConnect( + request, + this.#runConfig, + this, + actorId, + encoding, + connParams, + authData, + ); + } else if (url.pathname.startsWith(PATH_RAW_WEBSOCKET_PREFIX)) { + wsHandlerPromise = handleRawWebSocketHandler( + request, + url.pathname + url.search, + this, + actorId, + authData, + ); + } else { + throw new Error(`Unreachable path: ${url.pathname}`); + } + + // TODO: Add close + + // Connect the Hono WS hook to the adapter + const wsContext = new WSContext(websocket); + + wsHandlerPromise.catch((err) => { + logger().error("building websocket handlers errored", { err }); + wsContext.close(1011, `${err}`); + }); + + if (websocket.readyState === 1) { + wsHandlerPromise.then((x) => x.onOpen?.(new Event("open"), wsContext)); + } else { + websocket.addEventListener("open", (event) => { + wsHandlerPromise.then((x) => x.onOpen?.(event, wsContext)); + }); + } + + websocket.addEventListener("message", (event) => { + wsHandlerPromise.then((x) => x.onMessage?.(event, wsContext)); + }); + + websocket.addEventListener("close", (event) => { + wsHandlerPromise.then((x) => x.onClose?.(event, wsContext)); + }); + + websocket.addEventListener("error", (event) => { + wsHandlerPromise.then((x) => x.onError?.(event, wsContext)); + }); + } + + async shutdown(immediate: boolean): Promise { + logger().info("stopping engine actor driver"); + await this.#runner.shutdown(immediate); + } +} diff --git a/packages/core/src/drivers/engine/api-endpoints.ts b/packages/core/src/drivers/engine/api-endpoints.ts new file mode 100644 index 000000000..e554a8424 --- /dev/null +++ b/packages/core/src/drivers/engine/api-endpoints.ts @@ -0,0 +1,135 @@ +import { CreateRequest } from "@/client/mod"; +import { apiCall } from "./api-utils"; +import type { Config } from "./config"; +import { serializeActorKey } from "./keys"; + +// MARK: Common types +export type RivetId = string; + +export interface ActorLifecycle { + kill_timeout_ms: number; + durable: boolean; +} + +export interface Actor { + actor_id: RivetId; + name: string; + keys: string[]; + namespace_id: RivetId; + runner_name_selector: string; + lifecycle: ActorLifecycle; + create_ts: number; + connectable_ts?: number | null; + destroy_ts?: number | null; + sleep_ts?: number | null; + start_ts?: number | null; +} + +export interface ActorsGetResponse { + actor: Actor; +} + +export interface ActorsGetByIdResponse { + actor_id?: RivetId | null; +} + +export interface ActorsGetOrCreateResponse { + actor: Actor; + created: boolean; +} + +export interface ActorsGetOrCreateByIdResponse { + actor_id: RivetId; + created: boolean; +} + +export interface ActorsCreateRequest { + name: string; + runner_name_selector: string; + keys?: string[] | null; + input?: string | null; + durable?: boolean | null; +} + +export interface ActorsCreateResponse { + actor: Actor; +} + +// MARK: Get actor +export async function getActor( + config: Config, + actorId: RivetId, +): Promise { + return apiCall( + config.endpoint, + config.namespace, + "GET", + `/actors/${encodeURIComponent(actorId)}`, + ); +} + +// MARK: Get actor by id +export async function getActorById( + config: Config, + name: string, + key: string[], +): Promise { + const serializedKey = serializeActorKey(key); + return apiCall( + config.endpoint, + config.namespace, + "GET", + `/actors/by-id?name=${encodeURIComponent(name)}&key=${encodeURIComponent(serializedKey)}`, + ); +} + +// MARK: Get or create actor by id +export interface ActorsGetOrCreateByIdRequest { + name: string; + keys: string[]; + runner_name_selector: string; + input?: string | null; + durable?: boolean | null; +} + +export async function getOrCreateActorById( + config: Config, + request: ActorsGetOrCreateByIdRequest, +): Promise { + return apiCall( + config.endpoint, + config.namespace, + "PUT", + `/actors/by-id`, + request, + ); +} + +// MARK: Create actor +export async function createActor( + config: Config, + request: ActorsCreateRequest, +): Promise { + return apiCall( + config.endpoint, + config.namespace, + "POST", + `/actors`, + request, + ); +} + +// MARK: Destroy actor +export type ActorsDeleteResponse = {}; + +export async function destroyActor( + config: Config, + actorId: RivetId, +): Promise { + return apiCall( + config.endpoint, + config.namespace, + "DELETE", + `/actors/${encodeURIComponent(actorId)}`, + ); +} diff --git a/packages/core/src/drivers/engine/api-utils.ts b/packages/core/src/drivers/engine/api-utils.ts new file mode 100644 index 000000000..7fd44c1e5 --- /dev/null +++ b/packages/core/src/drivers/engine/api-utils.ts @@ -0,0 +1,70 @@ +import { logger } from "./log"; + +// Error class for Engine API errors +export class EngineApiError extends Error { + constructor( + public readonly group: string, + public readonly code: string, + message?: string, + ) { + super(message || `Engine API error: ${group}/${code}`); + this.name = "EngineApiError"; + } +} + +// Helper function for making API calls +export async function apiCall( + endpoint: string, + namespace: string, + method: "GET" | "POST" | "PUT" | "DELETE", + path: string, + body?: TInput, +): Promise { + const url = `${endpoint}${path}${path.includes("?") ? "&" : "?"}namespace=${encodeURIComponent(namespace)}`; + + const options: RequestInit = { + method, + headers: { + "Content-Type": "application/json", + }, + }; + + if (body !== undefined && method !== "GET") { + options.body = JSON.stringify(body); + } + + logger().debug("making api call", { method, url }); + + const response = await fetch(url, options); + + if (!response.ok) { + const errorText = await response.text(); + logger().error("api call failed", { + status: response.status, + statusText: response.statusText, + error: errorText, + method, + path, + }); + + // Try to parse error response + try { + const errorData = JSON.parse(errorText); + if (errorData.kind === "error" && errorData.group && errorData.code) { + throw new EngineApiError( + errorData.group, + errorData.code, + errorData.message, + ); + } + } catch (parseError) { + // If parsing fails or it's not our expected error format, continue + } + + throw new Error( + `API call failed: ${response.status} ${response.statusText}`, + ); + } + + return response.json() as Promise; +} diff --git a/packages/core/src/drivers/engine/config.ts b/packages/core/src/drivers/engine/config.ts new file mode 100644 index 000000000..e05070a29 --- /dev/null +++ b/packages/core/src/drivers/engine/config.ts @@ -0,0 +1,31 @@ +import type { Hono } from "hono"; +import { z } from "zod"; +import { getEnvUniversal } from "@/utils"; + +export const ConfigSchema = z + .object({ + app: z.custom().optional(), + endpoint: z + .string() + .default(getEnvUniversal("RIVET_ENGINE") ?? "http://localhost:7080"), + pegboardEndpoint: z.string().optional(), + namespace: z + .string() + .default(getEnvUniversal("RIVET_NAMESPACE") ?? "default"), + runnerName: z + .string() + .default(getEnvUniversal("RIVET_RUNNER") ?? "rivetkit"), + totalSlots: z.number().default(100_000), + addresses: z + .record( + z.object({ + host: z.string(), + port: z.number(), + }), + ) + .default({ main: { host: "127.0.0.1", port: 5051 } }), + }) + .default({}); + +export type InputConfig = z.input; +export type Config = z.infer; diff --git a/packages/core/src/drivers/engine/keys.test.ts b/packages/core/src/drivers/engine/keys.test.ts new file mode 100644 index 000000000..9aae7ad6a --- /dev/null +++ b/packages/core/src/drivers/engine/keys.test.ts @@ -0,0 +1,266 @@ +import { describe, expect, test } from "vitest"; +import { + deserializeActorKey, + EMPTY_KEY, + KEY_SEPARATOR, + serializeActorKey, +} from "./keys"; + +describe("Key serialization and deserialization", () => { + // Test serialization + describe("serializeActorKey", () => { + test("serializes empty key array", () => { + expect(serializeActorKey([])).toBe(EMPTY_KEY); + }); + + test("serializes single key", () => { + expect(serializeActorKey(["test"])).toBe("test"); + }); + + test("serializes multiple keys", () => { + expect(serializeActorKey(["a", "b", "c"])).toBe( + `a${KEY_SEPARATOR}b${KEY_SEPARATOR}c`, + ); + }); + + test("escapes forward slashes in keys", () => { + expect(serializeActorKey(["a/b"])).toBe("a\\/b"); + expect(serializeActorKey(["a/b", "c"])).toBe(`a\\/b${KEY_SEPARATOR}c`); + }); + + test("escapes empty key marker in keys", () => { + expect(serializeActorKey([EMPTY_KEY])).toBe(`\\${EMPTY_KEY}`); + }); + + test("handles complex keys", () => { + expect(serializeActorKey(["a/b", EMPTY_KEY, "c/d"])).toBe( + `a\\/b${KEY_SEPARATOR}\\${EMPTY_KEY}${KEY_SEPARATOR}c\\/d`, + ); + }); + }); + + // Test deserialization + describe("deserializeActorKey", () => { + test("deserializes empty string", () => { + expect(deserializeActorKey("")).toEqual([]); + }); + + test("deserializes undefined/null", () => { + expect(deserializeActorKey(undefined as unknown as string)).toEqual([]); + expect(deserializeActorKey(null as unknown as string)).toEqual([]); + }); + + test("deserializes empty key marker", () => { + expect(deserializeActorKey(EMPTY_KEY)).toEqual([]); + }); + + test("deserializes single key", () => { + expect(deserializeActorKey("test")).toEqual(["test"]); + }); + + test("deserializes multiple keys", () => { + expect( + deserializeActorKey(`a${KEY_SEPARATOR}b${KEY_SEPARATOR}c`), + ).toEqual(["a", "b", "c"]); + }); + + test("deserializes keys with escaped forward slashes", () => { + expect(deserializeActorKey("a\\/b")).toEqual(["a/b"]); + expect(deserializeActorKey(`a\\/b${KEY_SEPARATOR}c`)).toEqual([ + "a/b", + "c", + ]); + }); + + test("deserializes keys with escaped empty key marker", () => { + expect(deserializeActorKey(`\\${EMPTY_KEY}`)).toEqual([EMPTY_KEY]); + }); + + test("deserializes complex keys", () => { + expect( + deserializeActorKey( + `a\\/b${KEY_SEPARATOR}\\${EMPTY_KEY}${KEY_SEPARATOR}c\\/d`, + ), + ).toEqual(["a/b", EMPTY_KEY, "c/d"]); + }); + }); + + // Test edge cases + describe("edge cases", () => { + test("handles empty string parts", () => { + const testCases: Array<[string[], string]> = [ + [[""], "\\0"], + [["a", "", "b"], "a/\\0/b"], + [["", "a"], "\\0/a"], + [["a", ""], "a/\\0"], + [["", "", ""], "\\0/\\0/\\0"], + [["", "a", "", "b", ""], "\\0/a/\\0/b/\\0"], + [[], "/"], + [["test"], "test"], + [["a", "b", "c"], "a/b/c"], + [["a/b", "c"], "a\\/b/c"], + [[EMPTY_KEY], "\\/"], + [["a/b", EMPTY_KEY, "c/d"], "a\\/b/\\//c\\/d"], + [ + ["special\\chars", "more:complex,keys", "final key"], + "special\\\\chars/more:complex,keys/final key", + ], + ]; + + for (const [key, expectedSerialized] of testCases) { + const serialized = serializeActorKey(key); + expect(serialized).toBe(expectedSerialized); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + } + }); + + test("differentiates empty array from array with empty string", () => { + const emptyArray: string[] = []; + const arrayWithEmptyString = [""]; + + const serialized1 = serializeActorKey(emptyArray); + const serialized2 = serializeActorKey(arrayWithEmptyString); + + expect(serialized1).toBe(EMPTY_KEY); // Should be "/" + expect(serialized2).not.toBe(EMPTY_KEY); // Should NOT be "/" + expect(serialized2).toBe("\\0"); // Empty string becomes \0 marker + + expect(deserializeActorKey(serialized1)).toEqual(emptyArray); + expect(deserializeActorKey(serialized2)).toEqual(arrayWithEmptyString); + }); + + test("handles mix of empty strings and forward slash (EMPTY_KEY)", () => { + const testCases: Array<[string[], string]> = [ + [["", EMPTY_KEY, ""], "\\0/\\//\\0"], + [[EMPTY_KEY, ""], "\\//\\0"], + [["", EMPTY_KEY], "\\0/\\/"], + [["a", "", EMPTY_KEY, "", "b"], "a/\\0/\\//\\0/b"], + ]; + + for (const [key, expectedSerialized] of testCases) { + const serialized = serializeActorKey(key); + expect(serialized).toBe(expectedSerialized); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + } + }); + + test("handles literal backslash-zero string", () => { + const testCases: Array<[string[], string]> = [ + [["\\0"], "\\\\0"], // Literal \0 string + [["a\\0b"], "a\\\\0b"], // Literal \0 in middle + [["\\0", ""], "\\\\0/\\0"], // Literal \0 with empty string + [["", "\\0"], "\\0/\\\\0"], // Empty string with literal \0 + ]; + + for (const [key, expectedSerialized] of testCases) { + const serialized = serializeActorKey(key); + expect(serialized).toBe(expectedSerialized); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + } + }); + + test("handles backslash at the end", () => { + const key = ["abc\\"]; + const serialized = serializeActorKey(key); + expect(serialized).toBe("abc\\\\"); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + }); + + test("handles backslashes in middle of string", () => { + const testCases: Array<[string[], string]> = [ + [["abc\\def"], "abc\\\\def"], + [["abc\\\\def"], "abc\\\\\\\\def"], + [["path\\to\\file"], "path\\\\to\\\\file"], + ]; + + for (const [key, expectedSerialized] of testCases) { + const serialized = serializeActorKey(key); + expect(serialized).toBe(expectedSerialized); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + } + }); + + test("handles forward slashes at the end of strings", () => { + const serialized = serializeActorKey(["abc\\/"]); + expect(deserializeActorKey(serialized)).toEqual(["abc\\/"]); + }); + + test("handles mixed backslashes and forward slashes", () => { + const testCases: Array<[string[], string]> = [ + [["path\\to\\file/dir"], "path\\\\to\\\\file\\/dir"], + [["file\\with/slash"], "file\\\\with\\/slash"], + [["path\\to\\file", "with/slash"], "path\\\\to\\\\file/with\\/slash"], + ]; + + for (const [key, expectedSerialized] of testCases) { + const serialized = serializeActorKey(key); + expect(serialized).toBe(expectedSerialized); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + } + }); + + test("handles multiple consecutive forward slashes", () => { + const key = ["a//b"]; + const serialized = serializeActorKey(key); + expect(serialized).toBe("a\\/\\/b"); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + }); + + test("handles special characters", () => { + const key = ["a💻b", "c🔑d"]; + const serialized = serializeActorKey(key); + expect(serialized).toBe("a💻b/c🔑d"); + const deserialized = deserializeActorKey(serialized); + expect(deserialized).toEqual(key); + }); + + test("handles escaped forward slashes immediately after separator", () => { + const key = ["abc", "/def"]; + const serialized = serializeActorKey(key); + expect(serialized).toBe(`abc${KEY_SEPARATOR}\\/def`); + expect(deserializeActorKey(serialized)).toEqual(key); + }); + }); + + // Test exact key matching + describe("exact key matching", () => { + test("differentiates [a,b] from [a,b,c]", () => { + const key1 = ["a", "b"]; + const key2 = ["a", "b", "c"]; + + const serialized1 = serializeActorKey(key1); + const serialized2 = serializeActorKey(key2); + + expect(serialized1).not.toBe(serialized2); + }); + + test("differentiates [a,b] from [a]", () => { + const key1 = ["a", "b"]; + const key2 = ["a"]; + + const serialized1 = serializeActorKey(key1); + const serialized2 = serializeActorKey(key2); + + expect(serialized1).not.toBe(serialized2); + }); + + test("differentiates [a/b] from [a,b]", () => { + const key1 = ["a/b"]; + const key2 = ["a", "b"]; + + const serialized1 = serializeActorKey(key1); + const serialized2 = serializeActorKey(key2); + + expect(serialized1).not.toBe(serialized2); + expect(deserializeActorKey(serialized1)).toEqual(key1); + expect(deserializeActorKey(serialized2)).toEqual(key2); + }); + }); +}); diff --git a/packages/core/src/drivers/engine/keys.ts b/packages/core/src/drivers/engine/keys.ts new file mode 100644 index 000000000..a811a318a --- /dev/null +++ b/packages/core/src/drivers/engine/keys.ts @@ -0,0 +1,89 @@ +import type { ActorKey } from "@/mod"; + +export const EMPTY_KEY = "/"; +export const KEY_SEPARATOR = "/"; + +export function serializeActorKey(key: ActorKey): string { + // Use a special marker for empty key arrays + if (key.length === 0) { + return EMPTY_KEY; + } + + // Escape each key part to handle the separator and the empty key marker + const escapedParts = key.map((part) => { + // Handle empty strings by using a special marker + if (part === "") { + return "\\0"; // Use \0 as a marker for empty strings + } + + // Escape backslashes first to avoid conflicts with our markers + let escaped = part.replace(/\\/g, "\\\\"); + + // Then escape separators + escaped = escaped.replace(/\//g, `\\${KEY_SEPARATOR}`); + + return escaped; + }); + + return escapedParts.join(KEY_SEPARATOR); +} + +export function deserializeActorKey(keyString: string | undefined): ActorKey { + // Check for special empty key marker + if ( + keyString === undefined || + keyString === null || + keyString === EMPTY_KEY + ) { + return []; + } + + // Split by unescaped separators and unescape the escaped characters + const parts: string[] = []; + let currentPart = ""; + let escaping = false; + let isEmptyStringMarker = false; + + for (let i = 0; i < keyString.length; i++) { + const char = keyString[i]; + + if (escaping) { + // Handle special escape sequences + if (char === "0") { + // \0 represents an empty string marker + isEmptyStringMarker = true; + } else { + // This is an escaped character, add it directly + currentPart += char; + } + escaping = false; + } else if (char === "\\") { + // Start of an escape sequence + escaping = true; + } else if (char === KEY_SEPARATOR) { + // This is a separator + if (isEmptyStringMarker) { + parts.push(""); + isEmptyStringMarker = false; + } else { + parts.push(currentPart); + } + currentPart = ""; + } else { + // Regular character + currentPart += char; + } + } + + // Add the last part + if (escaping) { + // Incomplete escape at the end - treat as literal backslash + parts.push(currentPart + "\\"); + } else if (isEmptyStringMarker) { + parts.push(""); + } else if (currentPart !== "" || parts.length > 0) { + parts.push(currentPart); + } + + return parts; +} diff --git a/packages/core/src/drivers/engine/kv.ts b/packages/core/src/drivers/engine/kv.ts new file mode 100644 index 000000000..30895243a --- /dev/null +++ b/packages/core/src/drivers/engine/kv.ts @@ -0,0 +1,3 @@ +export const KEYS = { + PERSIST_DATA: [Uint8Array.from([1, 1])], +}; diff --git a/packages/core/src/drivers/engine/log.ts b/packages/core/src/drivers/engine/log.ts new file mode 100644 index 000000000..391df748a --- /dev/null +++ b/packages/core/src/drivers/engine/log.ts @@ -0,0 +1,7 @@ +import { getLogger } from "@/common/log"; + +export const LOGGER_NAME = "driver-engine"; + +export function logger() { + return getLogger(LOGGER_NAME); +} diff --git a/packages/core/src/drivers/engine/manager-driver.ts b/packages/core/src/drivers/engine/manager-driver.ts new file mode 100644 index 000000000..971b28340 --- /dev/null +++ b/packages/core/src/drivers/engine/manager-driver.ts @@ -0,0 +1,387 @@ +import * as cbor from "cbor-x"; +import type { Context as HonoContext } from "hono"; +import type { WSContext } from "hono/ws"; +import invariant from "invariant"; +import { ActorAlreadyExists, InternalError } from "@/actor/errors"; +import { + HEADER_AUTH_DATA, + HEADER_CONN_PARAMS, + HEADER_ENCODING, + HEADER_EXPOSE_INTERNAL_ERROR, +} from "@/actor/router-endpoints"; +import { importWebSocket } from "@/common/websocket"; +import type { + ActorOutput, + CreateInput, + GetForIdInput, + GetOrCreateWithKeyInput, + GetWithKeyInput, + ManagerDriver, +} from "@/driver-helpers/mod"; +import { type Encoding, noopNext, type RunConfig } from "@/mod"; +import { + createActor, + destroyActor, + getActor, + getActorById, + getOrCreateActorById, +} from "./api-endpoints"; +import { EngineApiError } from "./api-utils"; +import type { Config } from "./config"; +import { deserializeActorKey, serializeActorKey } from "./keys"; +import { logger } from "./log"; +import { createWebSocketProxy } from "./ws-proxy"; + +export class EngineManagerDriver implements ManagerDriver { + #config: Config; + #runConfig: RunConfig; + #importWebSocketPromise: Promise; + + constructor(config: Config, runConfig: RunConfig) { + this.#config = config; + this.#runConfig = runConfig; + this.#importWebSocketPromise = importWebSocket(); + } + + async sendRequest(actorId: string, actorRequest: Request): Promise { + logger().debug("sending request to actor via guard", { + actorId, + method: actorRequest.method, + url: actorRequest.url, + }); + + return this.#forwardHttpRequest(actorRequest, actorId); + } + + async openWebSocket( + path: string, + actorId: string, + encoding: Encoding, + params: unknown, + ): Promise { + const WebSocket = await this.#importWebSocketPromise; + + // WebSocket connections go through guard + const guardUrl = `${this.#config.endpoint}${path}`; + + logger().debug("opening websocket to actor via guard", { + actorId, + path, + guardUrl, + }); + + // Create WebSocket connection + const ws = new WebSocket(guardUrl, { + headers: buildGuardHeadersForWebSocket(actorId, encoding, params), + }); + + logger().debug("websocket connection opened", { actorId }); + + return ws; + } + + async proxyRequest( + _c: HonoContext, + actorRequest: Request, + actorId: string, + ): Promise { + logger().debug("forwarding request to actor via guard", { + actorId, + method: actorRequest.method, + url: actorRequest.url, + hasBody: !!actorRequest.body, + }); + + return this.#forwardHttpRequest(actorRequest, actorId); + } + + async proxyWebSocket( + c: HonoContext, + path: string, + actorId: string, + encoding: Encoding, + params: unknown, + authData: unknown, + ): Promise { + const upgradeWebSocket = this.#runConfig.getUpgradeWebSocket?.(); + invariant(upgradeWebSocket, "missing getUpgradeWebSocket"); + + const guardUrl = `${this.#config.endpoint}${path}`; + const wsGuardUrl = guardUrl.replace("http://", "ws://"); + + logger().debug("forwarding websocket to actor via guard", { + actorId, + path, + guardUrl, + }); + + // Build headers + const headers = buildGuardHeadersForWebSocket( + actorId, + encoding, + params, + authData, + ); + const args = await createWebSocketProxy(c, wsGuardUrl, headers); + + return await upgradeWebSocket(() => args)(c, noopNext()); + } + + extraStartupLog() { + return { + engine: this.#config.endpoint, + namespace: this.#config.namespace, + runner: this.#config.runnerName, + address: Object.values(this.#config.addresses) + .map((v) => `${v.host}:${v.port}`) + .join(", "), + }; + } + + async getForId({ + c, + name, + actorId, + }: GetForIdInput): Promise { + // Fetch from API if not in cache + try { + const response = await getActor(this.#config, actorId); + + // Validate name matches + if (response.actor.name !== name) { + logger().debug("actor name mismatch from api", { + actorId, + apiName: response.actor.name, + requestedName: name, + }); + return undefined; + } + + const keyRaw = response.actor.keys[0]; + invariant(keyRaw, `actor ${actorId} should have key`); + const key = deserializeActorKey(keyRaw); + + return { + actorId, + name, + key, + }; + } catch (error) { + if ( + error instanceof EngineApiError && + (error as EngineApiError).group === "actor" && + (error as EngineApiError).code === "not_found" + ) { + return undefined; + } + throw error; + } + } + + async getWithKey({ + c, + name, + key, + }: GetWithKeyInput): Promise { + logger().debug("getWithKey: searching for actor", { name, key }); + + // If not in local cache, fetch by key from API + try { + const response = await getActorById(this.#config, name, key); + + if (!response.actor_id) { + return undefined; + } + + const actorId = response.actor_id; + + logger().debug("getWithKey: found actor via api", { + actorId, + name, + key, + }); + + return { + actorId, + name, + key, + }; + } catch (error) { + if ( + error instanceof EngineApiError && + (error as EngineApiError).group === "actor" && + (error as EngineApiError).code === "not_found" + ) { + return undefined; + } + throw error; + } + } + + async getOrCreateWithKey( + input: GetOrCreateWithKeyInput, + ): Promise { + const { c, name, key, input: actorInput, region } = input; + + logger().info( + "getOrCreateWithKey: getting or creating actor via engine api", + { + name, + key, + }, + ); + + const response = await getOrCreateActorById(this.#config, { + name, + keys: [serializeActorKey(key)], + runner_name_selector: this.#config.runnerName, + input: input ? cbor.encode(actorInput).toString("base64") : undefined, + durable: true, + }); + + const actorId = response.actor_id; + + logger().info("getOrCreateWithKey: actor ready", { + actorId, + name, + key, + created: response.created, + }); + + return { + actorId, + name, + key, + }; + } + + async createActor({ + c, + name, + key, + input, + }: CreateInput): Promise { + // Check if actor with the same name and key already exists + const existingActor = await this.getWithKey({ c, name, key }); + if (existingActor) { + throw new ActorAlreadyExists(name, key); + } + + logger().info("creating actor via engine api", { name, key }); + + // Create actor via engine API + const result = await createActor(this.#config, { + name, + runner_name_selector: this.#config.runnerName, + keys: [serializeActorKey(key)], + input: input ? cbor.encode(input).toString("base64") : null, + durable: true, + }); + const actorId = result.actor.actor_id; + + logger().info("actor created", { actorId, name, key }); + + return { + actorId, + name, + key, + }; + } + + async destroyActor(actorId: string): Promise { + logger().info("destroying actor via engine api", { actorId }); + + await destroyActor(this.#config, actorId); + + logger().info("actor destroyed", { actorId }); + } + + async #forwardHttpRequest( + actorRequest: Request, + actorId: string, + ): Promise { + // Route through guard port + const url = new URL(actorRequest.url); + const guardUrl = `${this.#config.endpoint}${url.pathname}${url.search}`; + + // Handle body properly based on method and presence + let bodyToSend: ArrayBuffer | null = null; + const guardHeaders = buildGuardHeadersForHttp(actorRequest, actorId); + + if ( + actorRequest.body && + actorRequest.method !== "GET" && + actorRequest.method !== "HEAD" + ) { + if (actorRequest.bodyUsed) { + throw new Error("Request body has already been consumed"); + } + + // TODO: This buffers the entire request in memory every time. We + // need to properly implement streaming bodies. + // Clone and read the body to ensure it can be sent + const clonedRequest = actorRequest.clone(); + bodyToSend = await clonedRequest.arrayBuffer(); + + // If this is a streaming request, we need to convert the headers + // for the basic array buffer + guardHeaders.delete("transfer-encoding"); + guardHeaders.set( + "content-length", + String((bodyToSend as ArrayBuffer).byteLength), + ); + } + + const guardRequest = new Request(guardUrl, { + method: actorRequest.method, + headers: guardHeaders, + body: bodyToSend, + }); + + return mutableResponse(await fetch(guardRequest)); + } +} + +function mutableResponse(fetchRes: Response): Response { + // We cannot return the raw response from `fetch` since the response type is not mutable. + // + // In order for middleware to be able to mutate the response, we need to build a new Response object that is mutable. + return new Response(fetchRes.body, fetchRes); +} + +function buildGuardHeadersForHttp( + actorRequest: Request, + actorId: string, +): Headers { + const headers = new Headers(); + // Copy all headers from the original request + for (const [key, value] of actorRequest.headers.entries()) { + headers.set(key, value); + } + // Add guard-specific headers + headers.set("x-rivet-target", "actor"); + headers.set("x-rivet-actor", actorId); + headers.set("x-rivet-addr", "main"); + return headers; +} + +function buildGuardHeadersForWebSocket( + actorId: string, + encoding: Encoding, + params?: unknown, + authData?: unknown, +): Record { + const headers: Record = {}; + headers["x-rivet-target"] = "actor"; + headers["x-rivet-actor"] = actorId; + headers["x-rivet-addr"] = "main"; + headers[HEADER_EXPOSE_INTERNAL_ERROR] = "true"; + headers[HEADER_ENCODING] = encoding; + if (params) { + headers[HEADER_CONN_PARAMS] = JSON.stringify(params); + } + if (authData) { + headers[HEADER_AUTH_DATA] = JSON.stringify(authData); + } + return headers; +} diff --git a/packages/core/src/drivers/engine/mod.ts b/packages/core/src/drivers/engine/mod.ts new file mode 100644 index 000000000..a2f52970d --- /dev/null +++ b/packages/core/src/drivers/engine/mod.ts @@ -0,0 +1,36 @@ +import type { Client } from "@/client/client"; +import type { ManagerDriver } from "@/manager/driver"; +import type { RegistryConfig } from "@/registry/config"; +import type { DriverConfig, RunConfig } from "@/registry/run-config"; +import { EngineActorDriver } from "./actor-driver"; +import { ConfigSchema, type InputConfig } from "./config"; +import { EngineManagerDriver } from "./manager-driver"; + +export { EngineActorDriver } from "./actor-driver"; +export { type Config, ConfigSchema, type InputConfig } from "./config"; +export { EngineManagerDriver } from "./manager-driver"; + +export function createEngineDriver(inputConfig?: InputConfig): DriverConfig { + const config = ConfigSchema.parse(inputConfig); + + return { + name: "engine", + manager: (_registryConfig, runConfig) => { + return new EngineManagerDriver(config, runConfig); + }, + actor: ( + registryConfig: RegistryConfig, + runConfig: RunConfig, + managerDriver: ManagerDriver, + inlineClient: Client, + ) => { + return new EngineActorDriver( + registryConfig, + runConfig, + managerDriver, + inlineClient, + config, + ); + }, + }; +} diff --git a/packages/core/src/drivers/engine/ws-proxy.ts b/packages/core/src/drivers/engine/ws-proxy.ts new file mode 100644 index 000000000..03297a310 --- /dev/null +++ b/packages/core/src/drivers/engine/ws-proxy.ts @@ -0,0 +1,170 @@ +import type { Context as HonoContext } from "hono"; +import type { WSContext } from "hono/ws"; +import invariant from "invariant"; +import type { CloseEvent } from "ws"; +import { importWebSocket } from "@/common/websocket"; +import type { UpgradeWebSocketArgs } from "@/mod"; +import { logger } from "./log"; + +/** + * Returns Hono `upgradeWebSocket` args that will proxy requests from the client to a destination address. + */ +export async function createWebSocketProxy( + c: HonoContext, + targetUrl: string, + headers: Record, +): Promise { + const WebSocket = await importWebSocket(); + + // HACK: Sanitize WebSocket-specific headers. If we don't do this, some WebSocket implementations (i.e. native WebSocket in Node.js) will fail to connect. + for (const [k, v] of c.req.raw.headers.entries()) { + if (!k.startsWith("sec-") && k !== "connection" && k !== "upgrade") { + headers[k] = v; + } + } + + // WebSocket state + interface WsState { + targetWs?: WebSocket; + connectPromise?: Promise; + } + const state: WsState = {}; + + return { + onOpen: async (event: any, clientWs: WSContext) => { + logger().debug("client websocket connected", { targetUrl }); + + if (clientWs.readyState !== 1) { + logger().warn("client websocket not open on connection", { + targetUrl, + readyState: clientWs.readyState, + }); + return; + } + + // Create WebSocket + const targetWs = new WebSocket(targetUrl, { headers }); + state.targetWs = targetWs; + + // Setup connection promise + state.connectPromise = new Promise((resolve, reject) => { + targetWs.addEventListener("open", () => { + logger().debug("target websocket connected", { targetUrl }); + + if (clientWs.readyState !== 1) { + logger().warn("client websocket closed before target connected", { + targetUrl, + clientReadyState: clientWs.readyState, + }); + targetWs.close(1001, "Client disconnected"); + reject(new Error("Client disconnected")); + return; + } + resolve(); + }); + + targetWs.addEventListener("error", (error) => { + logger().warn("target websocket error during connection", { + targetUrl, + }); + reject(error); + }); + }); + + // Setup bidirectional forwarding + state.targetWs.addEventListener("message", (event) => { + if ( + typeof event.data === "string" || + event.data instanceof ArrayBuffer + ) { + clientWs.send(event.data); + } else if (event.data instanceof Blob) { + event.data.arrayBuffer().then((buffer) => { + clientWs.send(buffer); + }); + } + }); + + state.targetWs.addEventListener("close", (event) => { + logger().debug("target websocket closed", { + targetUrl, + code: event.code, + reason: event.reason, + }); + closeWebSocketIfOpen(clientWs, event.code, event.reason); + }); + + state.targetWs.addEventListener("error", (error) => { + logger().error("target websocket error", { targetUrl, error }); + closeWebSocketIfOpen(clientWs, 1011, "Target WebSocket error"); + }); + }, + + onMessage: async (event: any, clientWs: WSContext) => { + if (!state.targetWs || !state.connectPromise) { + logger().error("websocket state not initialized", { targetUrl }); + return; + } + + try { + await state.connectPromise; + if (state.targetWs.readyState === WebSocket.OPEN) { + state.targetWs.send(event.data); + } else { + logger().warn("target websocket not open", { + targetUrl, + readyState: state.targetWs.readyState, + }); + } + } catch (error) { + logger().error("failed to connect to target websocket", { + targetUrl, + error, + }); + closeWebSocketIfOpen(clientWs, 1011, "Failed to connect to target"); + } + }, + + onClose: (event: any, clientWs: WSContext) => { + logger().debug("client websocket closed", { + targetUrl, + code: event.code, + reason: event.reason, + wasClean: event.wasClean, + }); + + if (state.targetWs) { + if ( + state.targetWs.readyState === WebSocket.OPEN || + state.targetWs.readyState === WebSocket.CONNECTING + ) { + state.targetWs.close(1000, event.reason || "Client disconnected"); + } + } + }, + + onError: (event: any, clientWs: WSContext) => { + logger().error("client websocket error", { targetUrl, event }); + + if (state.targetWs) { + if (state.targetWs.readyState === WebSocket.OPEN) { + state.targetWs.close(1011, "Client WebSocket error"); + } else if (state.targetWs.readyState === WebSocket.CONNECTING) { + state.targetWs.close(); + } + } + }, + }; +} + +function closeWebSocketIfOpen( + ws: WebSocket | WSContext, + code: number, + reason: string, +): void { + if (ws.readyState === 1) { + ws.close(code, reason); + } else if ("close" in ws && (ws as WebSocket).readyState === WebSocket.OPEN) { + ws.close(code, reason); + } +} diff --git a/packages/core/src/drivers/file-system/manager.ts b/packages/core/src/drivers/file-system/manager.ts index af5f6973d..b5c64001f 100644 --- a/packages/core/src/drivers/file-system/manager.ts +++ b/packages/core/src/drivers/file-system/manager.ts @@ -21,6 +21,7 @@ import { createInlineClientDriver } from "@/inline-client-driver/mod"; import { ManagerInspector } from "@/inspector/manager"; import { type Actor, ActorFeature, type ActorId } from "@/inspector/mod"; import { + type DriverConfig, type Encoding, PATH_CONNECT_WEBSOCKET, PATH_RAW_WEBSOCKET_PREFIX, @@ -35,6 +36,7 @@ export class FileSystemManagerDriver implements ManagerDriver { #registryConfig: RegistryConfig; #runConfig: RunConfig; #state: FileSystemGlobalState; + #driverConfig: DriverConfig; #actorDriver: ActorDriver; #actorRouter: ActorRouter; @@ -45,10 +47,12 @@ export class FileSystemManagerDriver implements ManagerDriver { registryConfig: RegistryConfig, runConfig: RunConfig, state: FileSystemGlobalState, + driverConfig: DriverConfig, ) { this.#registryConfig = registryConfig; this.#runConfig = runConfig; this.#state = state; + this.#driverConfig = driverConfig; if (runConfig.studio.enabled) { if (!this.#runConfig.studio.token()) { @@ -116,7 +120,7 @@ export class FileSystemManagerDriver implements ManagerDriver { // Actors run on the same node as the manager, so we create a dummy actor router that we route requests to const inlineClient = createClientWithDriver(createInlineClientDriver(this)); - this.#actorDriver = runConfig.driver.actor( + this.#actorDriver = this.#driverConfig.actor( registryConfig, runConfig, this, @@ -192,7 +196,7 @@ export class FileSystemManagerDriver implements ManagerDriver { if (path === PATH_CONNECT_WEBSOCKET) { // Handle standard connect const wsHandler = await handleWebSocketConnect( - c, + c.req.raw, this.#runConfig, this.#actorDriver, actorId, @@ -205,7 +209,7 @@ export class FileSystemManagerDriver implements ManagerDriver { } else if (path.startsWith(PATH_RAW_WEBSOCKET_PREFIX)) { // Handle websocket proxy const wsHandler = await handleRawWebSocketHandler( - c, + c.req.raw, path, this.#actorDriver, actorId, diff --git a/packages/core/src/drivers/file-system/mod.ts b/packages/core/src/drivers/file-system/mod.ts index 1b78ecf37..b8e833bf8 100644 --- a/packages/core/src/drivers/file-system/mod.ts +++ b/packages/core/src/drivers/file-system/mod.ts @@ -13,10 +13,15 @@ export function createFileSystemOrMemoryDriver( customPath?: string, ): DriverConfig { const state = new FileSystemGlobalState(persist, customPath); - return { + const driverConfig: DriverConfig = { name: persist ? "file-system" : "memory", manager: (registryConfig, runConfig) => - new FileSystemManagerDriver(registryConfig, runConfig, state), + new FileSystemManagerDriver( + registryConfig, + runConfig, + state, + driverConfig, + ), actor: (registryConfig, runConfig, managerDriver, inlineClient) => new FileSystemActorDriver( registryConfig, @@ -26,6 +31,7 @@ export function createFileSystemOrMemoryDriver( state, ), }; + return driverConfig; } export function createFileSystemDriver(opts?: { path?: string }): DriverConfig { diff --git a/packages/core/src/inline-client-driver/mod.ts b/packages/core/src/inline-client-driver/mod.ts index 45f87fb65..213ca5d78 100644 --- a/packages/core/src/inline-client-driver/mod.ts +++ b/packages/core/src/inline-client-driver/mod.ts @@ -220,14 +220,18 @@ export function createInlineClientDriver( const normalizedPath = path.startsWith("/") ? path.slice(1) : path; const url = new URL(`http://actor/raw/http/${normalizedPath}`); - // Forward the request to the actor - const proxyRequest = new Request(url, init); - // Forward conn params if provided + const proxyRequestHeaders = new Headers(init.headers); if (params) { - proxyRequest.headers.set(HEADER_CONN_PARAMS, JSON.stringify(params)); + proxyRequestHeaders.set(HEADER_CONN_PARAMS, JSON.stringify(params)); } + // Forward the request to the actor + const proxyRequest = new Request(url, { + ...init, + headers: proxyRequestHeaders, + }); + return await managerDriver.sendRequest(actorId, proxyRequest); } catch (err) { // Standardize to ClientActorError instead of the native backend error @@ -291,6 +295,7 @@ export async function queryActor( if ("getForId" in query) { const output = await driver.getForId({ c, + name: query.getForId.name, actorId: query.getForId.actorId, }); if (!output) throw new errors.ActorNotFound(query.getForId.actorId); diff --git a/packages/core/src/manager/auth.ts b/packages/core/src/manager/auth.ts index 28dc66c7e..9ac948144 100644 --- a/packages/core/src/manager/auth.ts +++ b/packages/core/src/manager/auth.ts @@ -40,6 +40,7 @@ export async function getActorNameFromQuery( // TODO: This will have a duplicate call to getForId between this and queryActor const output = await driver.getForId({ c, + name: query.getForId.name, actorId: query.getForId.actorId, }); if (!output) throw new errors.ActorNotFound(query.getForId.actorId); diff --git a/packages/core/src/manager/driver.ts b/packages/core/src/manager/driver.ts index a5c9083c7..15d20247a 100644 --- a/packages/core/src/manager/driver.ts +++ b/packages/core/src/manager/driver.ts @@ -47,6 +47,7 @@ export interface ManagerDriver { } export interface GetForIdInput { c?: HonoContext | undefined; + name: string; actorId: string; } diff --git a/packages/core/src/manager/protocol/query.ts b/packages/core/src/manager/protocol/query.ts index 6b71e2185..756a84617 100644 --- a/packages/core/src/manager/protocol/query.ts +++ b/packages/core/src/manager/protocol/query.ts @@ -40,6 +40,7 @@ export const GetOrCreateRequestSchema = z.object({ export const ActorQuerySchema = z.union([ z.object({ getForId: z.object({ + name: z.string(), actorId: z.string(), }), }), diff --git a/packages/core/src/manager/router.ts b/packages/core/src/manager/router.ts index aa8b63b17..45483f975 100644 --- a/packages/core/src/manager/router.ts +++ b/packages/core/src/manager/router.ts @@ -839,6 +839,7 @@ export async function queryActor( if ("getForId" in query) { const output = await driver.getForId({ c, + name: query.getForId.name, actorId: query.getForId.actorId, }); if (!output) throw new errors.ActorNotFound(query.getForId.actorId); @@ -899,9 +900,26 @@ async function createTestWebSocketProxy( try { // Resolve the client WebSocket promise logger().debug("awaiting client websocket promise"); - clientWs = await clientWsPromise; + const ws = await clientWsPromise; + clientWs = ws; logger().debug("client websocket promise resolved", { - constructor: clientWs?.constructor.name, + constructor: ws?.constructor.name, + }); + + // Wait for ws to open + await new Promise((resolve, reject) => { + const onOpen = () => { + logger().debug("test websocket connection opened"); + resolve(); + }; + const onError = (error: any) => { + logger().error("test websocket connection failed", { error }); + reject( + new Error(`Failed to open WebSocket: ${error.message || error}`), + ); + }; + ws.addEventListener("open", onOpen); + ws.addEventListener("error", onError); }); } catch (error) { logger().error( @@ -1134,14 +1152,17 @@ async function handleSseConnectRequest( const url = new URL("http://actor/connect/sse"); // Always build fresh request to prevent forwarding unwanted headers - const proxyRequest = new Request(url); - proxyRequest.headers.set(HEADER_ENCODING, params.data.encoding); + const proxyRequestHeaderes = new Headers(); + proxyRequestHeaderes.set(HEADER_ENCODING, params.data.encoding); if (params.data.connParams) { - proxyRequest.headers.set(HEADER_CONN_PARAMS, params.data.connParams); + proxyRequestHeaderes.set(HEADER_CONN_PARAMS, params.data.connParams); } if (authData) { - proxyRequest.headers.set(HEADER_AUTH_DATA, JSON.stringify(authData)); + proxyRequestHeaderes.set(HEADER_AUTH_DATA, JSON.stringify(authData)); } + + const proxyRequest = new Request(url, { headers: proxyRequestHeaderes }); + return await driver.proxyRequest(c, proxyRequest, actorId); } catch (error) { // If we receive an error during setup, we send the error and close the socket immediately @@ -1389,14 +1410,17 @@ async function handleMessageRequest( const url = new URL("http://actor/connections/message"); // Always build fresh request to prevent forwarding unwanted headers + const proxyRequestHeaders = new Headers(); + proxyRequestHeaders.set(HEADER_ENCODING, encoding); + proxyRequestHeaders.set(HEADER_CONN_ID, connId); + proxyRequestHeaders.set(HEADER_CONN_TOKEN, connToken); + const proxyRequest = new Request(url, { method: "POST", body: c.req.raw.body, duplex: "half", + headers: proxyRequestHeaders, }); - proxyRequest.headers.set(HEADER_ENCODING, encoding); - proxyRequest.headers.set(HEADER_CONN_ID, connId); - proxyRequest.headers.set(HEADER_CONN_TOKEN, connToken); return await driver.proxyRequest(c, proxyRequest, actorId); } catch (error) { @@ -1462,18 +1486,21 @@ async function handleActionRequest( ); // Always build fresh request to prevent forwarding unwanted headers - const proxyRequest = new Request(url, { - method: "POST", - body: c.req.raw.body, - }); - proxyRequest.headers.set(HEADER_ENCODING, params.data.encoding); + const proxyRequestHeaders = new Headers(); + proxyRequestHeaders.set(HEADER_ENCODING, params.data.encoding); if (params.data.connParams) { - proxyRequest.headers.set(HEADER_CONN_PARAMS, params.data.connParams); + proxyRequestHeaders.set(HEADER_CONN_PARAMS, params.data.connParams); } if (authData) { - proxyRequest.headers.set(HEADER_AUTH_DATA, JSON.stringify(authData)); + proxyRequestHeaders.set(HEADER_AUTH_DATA, JSON.stringify(authData)); } + const proxyRequest = new Request(url, { + method: "POST", + body: c.req.raw.body, + headers: proxyRequestHeaders, + }); + return await driver.proxyRequest(c, proxyRequest, actorId); } catch (error) { logger().error("error in action handler", { error: stringifyError(error) }); @@ -1580,26 +1607,26 @@ async function handleRawHttpRequest( ); // Forward the request to the actor - const proxyRequest = new Request(url, { - method: c.req.method, - headers: c.req.raw.headers, - body: c.req.raw.body, - }); logger().debug("rewriting http url", { from: c.req.url, - to: proxyRequest.url, + to: url, }); - // Forward conn params if provided + const proxyRequestHeaders = new Headers(c.req.raw.headers); if (connParams) { - proxyRequest.headers.set(HEADER_CONN_PARAMS, JSON.stringify(connParams)); + proxyRequestHeaders.set(HEADER_CONN_PARAMS, JSON.stringify(connParams)); } - // Forward auth data to actor if (authData) { - proxyRequest.headers.set(HEADER_AUTH_DATA, JSON.stringify(authData)); + proxyRequestHeaders.set(HEADER_AUTH_DATA, JSON.stringify(authData)); } + const proxyRequest = new Request(url, { + method: c.req.method, + headers: proxyRequestHeaders, + body: c.req.raw.body, + }); + return await driver.proxyRequest(c, proxyRequest, actorId); } catch (error) { logger().error("error in raw http handler", { diff --git a/packages/core/src/mod.ts b/packages/core/src/mod.ts index 4c2430f78..3cd62b030 100644 --- a/packages/core/src/mod.ts +++ b/packages/core/src/mod.ts @@ -8,6 +8,7 @@ export { } from "@/client/client"; export { InlineWebSocketAdapter2 } from "@/common/inline-websocket-adapter2"; export { noopNext } from "@/common/utils"; +export { createEngineDriver } from "@/drivers/engine/mod"; export { createFileSystemDriver, createMemoryDriver, diff --git a/packages/core/src/registry/mod.ts b/packages/core/src/registry/mod.ts index a937c19ed..ab2a34a97 100644 --- a/packages/core/src/registry/mod.ts +++ b/packages/core/src/registry/mod.ts @@ -1,6 +1,7 @@ import type { Hono } from "hono"; import { createActorRouter } from "@/actor/router"; import { type Client, createClientWithDriver } from "@/client/client"; +import { chooseDefaultDriver } from "@/drivers/default"; import { createInlineClientDriver } from "@/inline-client-driver/mod"; import { getStudioUrl } from "@/inspector/utils"; import { createManagerRouter } from "@/manager/router"; @@ -26,12 +27,6 @@ interface ServerOutput> { serve: (hono?: Hono) => void; } -interface ActorNodeOutput { - hono: Hono; - handler: (req: Request) => Promise; - serve: (hono?: Hono) => void; -} - export class Registry { #config: RegistryConfig; @@ -49,6 +44,9 @@ export class Registry { public createServer(inputConfig?: RunConfigInput): ServerOutput { const config = RunConfigSchema.parse(inputConfig); + // Choose the driver based on configuration + const driver = chooseDefaultDriver(config); + // Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe let upgradeWebSocket: any; if (!config.getUpgradeWebSocket) { @@ -56,7 +54,7 @@ export class Registry { } // Create router - const managerDriver = config.driver.manager(this.#config, config); + const managerDriver = driver.manager(this.#config, config); const clientDriver = createInlineClientDriver(managerDriver); const { router: hono } = createManagerRouter( this.#config, @@ -71,7 +69,7 @@ export class Registry { const driverLog = managerDriver.extraStartupLog?.() ?? {}; logger().info("rivetkit ready", { - driver: config.driver.name, + driver: driver.name, definitions: Object.keys(this.#config.use).length, ...driverLog, }); @@ -81,6 +79,20 @@ export class Registry { }); } + // Create runner + if (config.role === "all" || config.role === "runner") { + const inlineClient = createClientWithDriver( + createInlineClientDriver(managerDriver), + ); + const _actorDriver = driver.actor( + this.#config, + config, + managerDriver, + inlineClient, + ); + // TODO: What do we do with the actor driver here? + } + return { client, hono, @@ -99,49 +111,6 @@ export class Registry { const { serve } = this.createServer(inputConfig); serve(); } - - /** - * Creates a worker for the registry. - */ - public createWorker(inputConfig?: RunConfigInput): ActorNodeOutput { - const config = RunConfigSchema.parse(inputConfig); - - // Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe - let upgradeWebSocket: any; - if (!config.getUpgradeWebSocket) { - config.getUpgradeWebSocket = () => upgradeWebSocket!; - } - - // Create router - const managerDriver = config.driver.manager(this.#config, config); - const inlineClient = createClientWithDriver( - createInlineClientDriver(managerDriver), - ); - const actorDriver = config.driver.actor( - this.#config, - config, - managerDriver, - inlineClient, - ); - const hono = createActorRouter(config, actorDriver); - - return { - hono, - handler: async (req: Request) => await hono.fetch(req), - serve: async (app) => { - const out = await crossPlatformServe(hono, app); - upgradeWebSocket = out.upgradeWebSocket; - }, - }; - } - - /** - * Runs the standalone worker. - */ - public async runWorker(inputConfig?: RunConfigInput) { - const { serve } = this.createWorker(inputConfig); - serve(); - } } export function setup( diff --git a/packages/core/src/registry/run-config.ts b/packages/core/src/registry/run-config.ts index 28776cfcb..4b57a961d 100644 --- a/packages/core/src/registry/run-config.ts +++ b/packages/core/src/registry/run-config.ts @@ -1,7 +1,6 @@ import type { cors } from "hono/cors"; import { z } from "zod"; import type { ActorDriverBuilder } from "@/actor/driver"; -import { createDefaultDriver } from "@/drivers/default"; import { InspectorConfigSchema } from "@/inspector/config"; import type { ManagerDriverBuilder } from "@/manager/driver"; import type { UpgradeWebSocket } from "@/utils"; @@ -22,7 +21,10 @@ export type DriverConfig = z.infer; /** Base config used for the actor config across all platforms. */ export const RunConfigSchema = z .object({ - driver: DriverConfigSchema.optional().default(() => createDefaultDriver()), + driver: DriverConfigSchema.optional(), + + /** Endpoint to connect to the Rivet engine. Can be configured via RIVET_ENGINE env var. */ + engine: z.string().optional(), // This is a function to allow for lazy configuration of upgradeWebSocket on the // fly. This is required since the dependencies that profie upgradeWebSocket @@ -30,6 +32,8 @@ export const RunConfigSchema = z // created or must be imported async using `await import(...)` getUpgradeWebSocket: z.custom().optional(), + role: z.enum(["all", "server", "runner"]).optional().default("all"), + /** CORS configuration for the router. Uses Hono's CORS middleware options. */ cors: z.custom().optional(), diff --git a/packages/core/src/test/mod.ts b/packages/core/src/test/mod.ts index 4d30cdee1..464e9ad71 100644 --- a/packages/core/src/test/mod.ts +++ b/packages/core/src/test/mod.ts @@ -3,6 +3,7 @@ import { serve as honoServe, type ServerType } from "@hono/node-server"; import { createNodeWebSocket } from "@hono/node-ws"; import { type TestContext, vi } from "vitest"; import { type Client, createClient } from "@/client/mod"; +import { chooseDefaultDriver } from "@/drivers/default"; import { createFileSystemOrMemoryDriver } from "@/drivers/file-system/mod"; import { createInlineClientDriver } from "@/inline-client-driver/mod"; import { getStudioUrl } from "@/inspector/utils"; @@ -15,9 +16,6 @@ import { logger } from "./log"; function serve(registry: Registry, inputConfig?: InputConfig): ServerType { // Configure default configuration inputConfig ??= {}; - if (!inputConfig.driver) { - inputConfig.driver = createFileSystemOrMemoryDriver(false); - } const config = ConfigSchema.parse(inputConfig); @@ -28,7 +26,8 @@ function serve(registry: Registry, inputConfig?: InputConfig): ServerType { // Create router const runConfig = RunConfigSchema.parse(inputConfig); - const managerDriver = config.driver.manager(registry.config, config); + const driver = inputConfig.driver ?? createFileSystemOrMemoryDriver(false); + const managerDriver = driver.manager(registry.config, config); const inlineClientDriver = createInlineClientDriver(managerDriver); const { router } = createManagerRouter( registry.config, diff --git a/packages/core/tests/driver-test-suite-engine.test.ts b/packages/core/tests/driver-test-suite-engine.test.ts new file mode 100644 index 000000000..1009a7ff2 --- /dev/null +++ b/packages/core/tests/driver-test-suite-engine.test.ts @@ -0,0 +1,80 @@ +import { join } from "node:path"; +import { createClientWithDriver } from "@/client/client"; +import { createTestRuntime, runDriverTests } from "@/driver-test-suite/mod"; +import { createEngineDriver } from "@/drivers/engine/mod"; +import { createInlineClientDriver } from "@/inline-client-driver/mod"; +import { RunConfigSchema } from "@/registry/run-config"; +import { getPort } from "@/test/mod"; + +runDriverTests({ + // Use real timers for engine-runner tests + useRealTimers: true, + skip: { + // Skip tests that aren't applicable for engine-runner + schedule: true, // Scheduling handled by engine + }, + async start(projectPath: string) { + return await createTestRuntime( + join(projectPath, "registry.ts"), + async (registry) => { + // Get configuration from environment or use defaults + const endpoint = process.env.RIVET_ENDPOINT || "http://localhost:7080"; + const namespace = `test-${crypto.randomUUID().slice(0, 8)}`; + + // Pick port + const port = await getPort(); + + // Create namespace + const response = await fetch(`${endpoint}/namespaces`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + name: namespace, + display_name: namespace, + }), + }); + if (!response.ok) { + throw "Create namespace failed"; + } + + // Create driver config + const driverConfig = createEngineDriver({ + endpoint, + namespace, + runnerName: "test-runner", + totalSlots: 100, + addresses: { + main: { + host: "127.0.0.1", + port: port, + }, + }, + }); + + // Start the actor driver + const runConfig = RunConfigSchema.parse({ + driver: driverConfig, + getUpgradeWebSocket: () => undefined, + }); + const managerDriver = driverConfig.manager(registry.config, runConfig); + const inlineClientDriver = createInlineClientDriver(managerDriver); + const inlineClient = createClientWithDriver(inlineClientDriver); + const actorDriver = driverConfig.actor( + registry.config, + runConfig, + managerDriver, + inlineClient, + ); + + return { + driver: driverConfig, + cleanup: async () => { + await actorDriver.shutdown?.(true); + }, + }; + }, + ); + }, +}); diff --git a/packages/core/tests/rivet/deployment.test.ts.old b/packages/core/tests/rivet/deployment.test.ts.old deleted file mode 100644 index cdbc7fc74..000000000 --- a/packages/core/tests/rivet/deployment.test.ts.old +++ /dev/null @@ -1,37 +0,0 @@ -// import { describe, test, expect, beforeAll, afterAll } from "vitest"; -// import os from "node:os"; -// import fs from "node:fs/promises"; -// import path from "node:path"; -// import { fileURLToPath } from "node:url"; -// import { deployToRivet } from "./rivet-deploy"; -// import { randomUUID } from "node:crypto"; -// -// const __dirname = path.dirname(fileURLToPath(import.meta.url)); -// -// // Simple counter actor definition to deploy -// const COUNTER_ACTOR = ` -// import { actor, setup } from "@rivetkit/core"; -// -// const counter = actor({ -// state: { count: 0 }, -// actions: { -// increment: (c, amount) => { -// c.state.count += amount; -// c.broadcast("newCount", c.state.count); -// return c.state.count; -// }, -// getCount: (c) => { -// return c.state.count; -// }, -// }, -// }); -// -// export const registry = setup({ -// use: { counter }, -// }); -// -// test("Rivet deployment tests", async () => { -// const tempFilePath = path.join(os.tmpdir(), `registry-${randomUUID()}`); -// await fs.writeFile(tempFilePath, COUNTER_ACTOR); -// await deployToRivet("test-registry", tempFilePath, true); -// }); diff --git a/packages/core/tests/rivet/driver-tests.test.ts.old b/packages/core/tests/rivet/driver-tests.test.ts.old deleted file mode 100644 index 338f1174b..000000000 --- a/packages/core/tests/rivet/driver-tests.test.ts.old +++ /dev/null @@ -1,57 +0,0 @@ -// import { runDriverTests } from "@rivetkit/core/driver-test-suite"; -// import { deployToRivet, rivetClientConfig } from "./rivet-deploy"; -// import { RivetClientConfig, rivetRequest } from "../src/rivet-client"; -// import invariant from "invariant"; -// -// let deployProjectOnce: Promise | undefined = undefined; -// -// // IMPORTANT: Unlike other tests, Rivet tests are ran without parallelism since we reuse the same shared environment. Eventually we can create an environment per test to create isolated instances. -// runDriverTests({ -// useRealTimers: true, -// HACK_skipCleanupNet: true, -// async start(projectPath: string) { -// // Setup project -// if (!deployProjectOnce) { -// deployProjectOnce = deployToRivet(projectPath); -// } -// const endpoint = await deployProjectOnce; -// -// // Cleanup actors from previous tests -// await deleteAllActors(rivetClientConfig); -// -// // Flush cache since we manually updated the actors -// const res = await fetch(`${endpoint}/.test/rivet/flush-cache`, { -// method: "POST", -// }); -// invariant(res.ok, `request failed: ${res.status}`); -// -// return { -// endpoint, -// async cleanup() { -// // This takes time and slows down tests -- it's fine if we leak actors that'll be cleaned up in the next run -// // await deleteAllActors(rivetClientConfig); -// }, -// }; -// }, -// }); -// -// async function deleteAllActors(clientConfig: RivetClientConfig) { -// // TODO: This is not paginated -// -// console.log("Listing actors to delete"); -// const { actors } = await rivetRequest< -// void, -// { actors: { id: string; tags: Record }[] } -// >(clientConfig, "GET", "/actors"); -// -// for (const actor of actors) { -// if (actor.tags.role !== "actor") continue; -// -// console.log(`Deleting actor ${actor.id} (${JSON.stringify(actor.tags)})`); -// await rivetRequest( -// clientConfig, -// "DELETE", -// `/actors/${actor.id}`, -// ); -// } -// } diff --git a/packages/core/tests/rivet/key-serialization.test.ts.old b/packages/core/tests/rivet/key-serialization.test.ts.old deleted file mode 100644 index 7684ddaff..000000000 --- a/packages/core/tests/rivet/key-serialization.test.ts.old +++ /dev/null @@ -1,197 +0,0 @@ -// import { describe, test, expect } from "vitest"; -// import { serializeKeyForTag, deserializeKeyFromTag, EMPTY_KEY, KEY_SEPARATOR } from "../src/util"; -// -// describe("Key serialization and deserialization", () => { -// // Test serialization -// describe("serializeKeyForTag", () => { -// test("serializes empty key array", () => { -// expect(serializeKeyForTag([])).toBe(EMPTY_KEY); -// }); -// -// test("serializes single key", () => { -// expect(serializeKeyForTag(["test"])).toBe("test"); -// }); -// -// test("serializes multiple keys", () => { -// expect(serializeKeyForTag(["a", "b", "c"])).toBe(`a${KEY_SEPARATOR}b${KEY_SEPARATOR}c`); -// }); -// -// test("escapes commas in keys", () => { -// expect(serializeKeyForTag(["a,b"])).toBe("a\\,b"); -// expect(serializeKeyForTag(["a,b", "c"])).toBe(`a\\,b${KEY_SEPARATOR}c`); -// }); -// -// test("escapes empty key marker in keys", () => { -// expect(serializeKeyForTag([EMPTY_KEY])).toBe(`\\${EMPTY_KEY}`); -// }); -// -// test("handles complex keys", () => { -// expect(serializeKeyForTag(["a,b", EMPTY_KEY, "c,d"])).toBe(`a\\,b${KEY_SEPARATOR}\\${EMPTY_KEY}${KEY_SEPARATOR}c\\,d`); -// }); -// }); -// -// // Test deserialization -// describe("deserializeKeyFromTag", () => { -// test("deserializes empty string", () => { -// expect(deserializeKeyFromTag("")).toEqual([]); -// }); -// -// test("deserializes undefined/null", () => { -// expect(deserializeKeyFromTag(undefined as unknown as string)).toEqual([]); -// expect(deserializeKeyFromTag(null as unknown as string)).toEqual([]); -// }); -// -// test("deserializes empty key marker", () => { -// expect(deserializeKeyFromTag(EMPTY_KEY)).toEqual([]); -// }); -// -// test("deserializes single key", () => { -// expect(deserializeKeyFromTag("test")).toEqual(["test"]); -// }); -// -// test("deserializes multiple keys", () => { -// expect(deserializeKeyFromTag(`a${KEY_SEPARATOR}b${KEY_SEPARATOR}c`)).toEqual(["a", "b", "c"]); -// }); -// -// test("deserializes keys with escaped commas", () => { -// expect(deserializeKeyFromTag("a\\,b")).toEqual(["a,b"]); -// expect(deserializeKeyFromTag(`a\\,b${KEY_SEPARATOR}c`)).toEqual(["a,b", "c"]); -// }); -// -// test("deserializes keys with escaped empty key marker", () => { -// expect(deserializeKeyFromTag(`\\${EMPTY_KEY}`)).toEqual([EMPTY_KEY]); -// }); -// -// test("deserializes complex keys", () => { -// expect(deserializeKeyFromTag(`a\\,b${KEY_SEPARATOR}\\${EMPTY_KEY}${KEY_SEPARATOR}c\\,d`)).toEqual(["a,b", EMPTY_KEY, "c,d"]); -// }); -// }); -// -// // Test roundtrip -// describe("roundtrip", () => { -// const testKeys = [ -// [], -// ["test"], -// ["a", "b", "c"], -// ["a,b", "c"], -// [EMPTY_KEY], -// ["a,b", EMPTY_KEY, "c,d"], -// ["special\\chars", "more:complex,keys", "final key"] -// ]; -// -// testKeys.forEach(key => { -// test(`roundtrip: ${JSON.stringify(key)}`, () => { -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// }); -// }); -// -// test("handles all test cases in a large batch", () => { -// for (const key of testKeys) { -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// } -// }); -// }); -// -// // Test edge cases -// describe("edge cases", () => { -// test("handles backslash at the end", () => { -// const key = ["abc\\"]; -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// }); -// -// test("handles backslashes in middle of string", () => { -// const keys = [ -// ["abc\\def"], -// ["abc\\\\def"], -// ["path\\to\\file"] -// ]; -// -// for (const key of keys) { -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// } -// }); -// -// test("handles commas at the end of strings", () => { -// const serialized = serializeKeyForTag(["abc\\,"]); -// expect(deserializeKeyFromTag(serialized)).toEqual(["abc\\,"]); -// }); -// -// test("handles mixed backslashes and commas", () => { -// const keys = [ -// ["path\\to\\file,dir"], -// ["file\\with,comma"], -// ["path\\to\\file", "with,comma"] -// ]; -// -// for (const key of keys) { -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// } -// }); -// -// test("handles multiple consecutive commas", () => { -// const key = ["a,,b"]; -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// }); -// -// test("handles special characters", () => { -// const key = ["a💻b", "c🔑d"]; -// const serialized = serializeKeyForTag(key); -// const deserialized = deserializeKeyFromTag(serialized); -// expect(deserialized).toEqual(key); -// }); -// -// test("handles escaped commas immediately after separator", () => { -// const key = ["abc", ",def"]; -// const serialized = serializeKeyForTag(key); -// expect(serialized).toBe(`abc${KEY_SEPARATOR}\\,def`); -// expect(deserializeKeyFromTag(serialized)).toEqual(key); -// }); -// }); -// -// // Test exact key matching -// describe("exact key matching", () => { -// test("differentiates [a,b] from [a,b,c]", () => { -// const key1 = ["a", "b"]; -// const key2 = ["a", "b", "c"]; -// -// const serialized1 = serializeKeyForTag(key1); -// const serialized2 = serializeKeyForTag(key2); -// -// expect(serialized1).not.toBe(serialized2); -// }); -// -// test("differentiates [a,b] from [a]", () => { -// const key1 = ["a", "b"]; -// const key2 = ["a"]; -// -// const serialized1 = serializeKeyForTag(key1); -// const serialized2 = serializeKeyForTag(key2); -// -// expect(serialized1).not.toBe(serialized2); -// }); -// -// test("differentiates [a,b] from [a:b]", () => { -// const key1 = ["a,b"]; -// const key2 = ["a", "b"]; -// -// const serialized1 = serializeKeyForTag(key1); -// const serialized2 = serializeKeyForTag(key2); -// -// expect(serialized1).not.toBe(serialized2); -// expect(deserializeKeyFromTag(serialized1)).toEqual(key1); -// expect(deserializeKeyFromTag(serialized2)).toEqual(key2); -// }); -// }); -// }); diff --git a/packages/core/tests/rivet/rivet-deploy.ts.old b/packages/core/tests/rivet/rivet-deploy.ts.old deleted file mode 100644 index 6d13874db..000000000 --- a/packages/core/tests/rivet/rivet-deploy.ts.old +++ /dev/null @@ -1,365 +0,0 @@ -// import fs from "node:fs/promises"; -// import path from "node:path"; -// import os from "node:os"; -// import { spawn, exec } from "node:child_process"; -// import crypto from "node:crypto"; -// import { promisify } from "node:util"; -// import invariant from "invariant"; -// import { RivetClient } from "@rivet-gg/api"; -// import { RivetClientConfig } from "../src/rivet-client"; -// -// const execPromise = promisify(exec); -// const apiEndpoint = process.env.RIVET_ENDPOINT!; -// invariant(apiEndpoint, "missing RIVET_ENDPOINT"); -// const rivetCloudToken = process.env.RIVET_CLOUD_TOKEN!; -// invariant(rivetCloudToken, "missing RIVET_CLOUD_TOKEN"); -// const project = process.env.RIVET_PROJECT!; -// invariant(project, "missing RIVET_PROJECT"); -// const environment = process.env.RIVET_ENVIRONMENT!; -// invariant(environment, "missing RIVET_ENVIRONMENT"); -// -// export const rivetClientConfig: RivetClientConfig = { -// endpoint: apiEndpoint, -// token: rivetCloudToken, -// project, -// environment, -// }; -// -// const rivetClient = new RivetClient({ -// environment: apiEndpoint, -// token: rivetCloudToken, -// }); -// -// /** -// * Helper function to write a file to the filesystem -// */ -// async function writeFile( -// dirPath: string, -// filename: string, -// content: string | object, -// ): Promise { -// const filePath = path.join(dirPath, filename); -// const fileContent = -// typeof content === "string" ? content : JSON.stringify(content, null, 2); -// -// console.log(`Writing ${filename}`); -// await fs.writeFile(filePath, fileContent); -// } -// -// /** -// * Pack a package using pnpm pack and return the path to the packed tarball -// */ -// async function packPackage( -// packageDir: string, -// tmpDir: string, -// packageName: string, -// ): Promise { -// console.log(`Packing package from ${packageDir}...`); -// // Generate a unique filename -// const outputFileName = `${packageName}-${crypto.randomUUID()}.tgz`; -// const outputPath = path.join(tmpDir, outputFileName); -// -// // Run pnpm pack with specific output path -// await execPromise(`pnpm pack --install-if-needed --out ${outputPath}`, { -// cwd: packageDir, -// }); -// console.log(`Generated tarball at ${outputPath}`); -// return outputFileName; -// } -// -// /** -// * Deploy an app to Rivet and return the endpoint -// */ -// export async function deployToRivet(projectPath: string) { -// console.log("=== START deployToRivet ==="); -// console.log(`Deploying registry from path: ${projectPath}`); -// -// // Create a temporary directory for the test -// const uuid = crypto.randomUUID(); -// const tmpDirName = `rivetkit-test-${uuid}`; -// const tmpDir = path.join(os.tmpdir(), tmpDirName); -// console.log(`Creating temp directory: ${tmpDir}`); -// await fs.mkdir(tmpDir, { recursive: true }); -// -// // Get the workspace root and package paths -// const workspaceRoot = path.resolve(__dirname, "../../../.."); -// const rivetPlatformPath = path.resolve(__dirname, "../"); -// const rivetkitCorePath = path.resolve(workspaceRoot, "packages/core"); -// -// // Pack the required packages directly to the temp directory -// console.log("Packing required packages..."); -// const rivetPlatformFilename = await packPackage( -// rivetPlatformPath, -// tmpDir, -// "rivetkit-rivet", -// ); -// const rivetkitFilename = await packPackage( -// rivetkitCorePath, -// tmpDir, -// "rivetkit", -// ); -// -// // Create package.json with file dependencies -// const packageJson = { -// name: "rivetkit-test", -// private: true, -// version: "1.0.0", -// scripts: { -// build: "tsc", -// }, -// dependencies: { -// "@rivetkit/rivet": `file:./${rivetPlatformFilename}`, -// rivetkit: `file:./${rivetkitFilename}`, -// }, -// devDependencies: { -// typescript: "^5.3.0", -// }, -// packageManager: -// "pnpm@10.7.1+sha512.2d92c86b7928dc8284f53494fb4201f983da65f0fb4f0d40baafa5cf628fa31dae3e5968f12466f17df7e97310e30f343a648baea1b9b350685dafafffdf5808", -// }; -// await writeFile(tmpDir, "package.json", packageJson); -// -// // Create rivet.json with workspace dependencies -// const rivetJson = { -// functions: { -// manager: { -// tags: { role: "manager", framework: "rivetkit" }, -// dockerfile: "Dockerfile", -// runtime: { -// environment: { -// RIVET_ENDPOINT: apiEndpoint, -// RIVET_SERVICE_TOKEN: rivetCloudToken, // TODO: This should be a service token, but both work -// RIVET_PROJECT: project, -// RIVET_ENVIRONMENT: environment, -// _LOG_LEVEL: "DEBUG", -// _ACTOR_LOG_LEVEL: "DEBUG", -// }, -// }, -// resources: { -// cpu: 250, -// memory: 256, -// }, -// }, -// }, -// actors: { -// actor: { -// tags: { role: "actor", framework: "rivetkit" }, -// script: "src/actor.ts", -// }, -// }, -// }; -// await writeFile(tmpDir, "rivet.json", rivetJson); -// -// // Create Dockerfile -// const dockerfile = ` -// FROM node:22-alpine AS builder -// -// RUN npm i -g corepack && corepack enable -// -// WORKDIR /app -// -// COPY package.json pnpm-lock.yaml ./ -// COPY *.tgz ./ -// -// RUN --mount=type=cache,id=pnpm,target=/pnpm/store \ -// pnpm install --frozen-lockfile -// -// COPY . . -// # HACK: Remove actor.ts bc file is invalid in Node -// RUN rm src/actor.ts && pnpm build -// -// RUN --mount=type=cache,id=pnpm,target=/pnpm/store \ -// pnpm install --prod --frozen-lockfile -// -// FROM node:22-alpine AS runtime -// -// RUN addgroup -g 1001 -S rivet && \ -// adduser -S rivet -u 1001 -G rivet -// -// WORKDIR /app -// -// COPY --from=builder --chown=rivet:rivet /app/dist ./dist -// COPY --from=builder --chown=rivet:rivet /app/node_modules ./node_modules -// COPY --from=builder --chown=rivet:rivet /app/package.json ./ -// -// USER rivet -// -// CMD ["node", "dist/server.js"] -// `; -// await writeFile(tmpDir, "Dockerfile", dockerfile); -// -// // Create .dockerignore -// const dockerignore = ` -// node_modules -// `; -// await writeFile(tmpDir, ".dockerignore", dockerignore); -// -// // Disable PnP -// const yarnPnp = "nodeLinker: node-modules"; -// await writeFile(tmpDir, ".yarnrc.yml", yarnPnp); -// -// // Create tsconfig.json -// const tsconfig = { -// compilerOptions: { -// target: "ESNext", -// module: "NodeNext", -// moduleResolution: "NodeNext", -// esModuleInterop: true, -// strict: true, -// skipLibCheck: true, -// forceConsistentCasingInFileNames: true, -// outDir: "dist", -// sourceMap: true, -// declaration: true, -// }, -// include: ["src/**/*.ts"], -// }; -// await writeFile(tmpDir, "tsconfig.json", tsconfig); -// -// // Install deps -// console.log("Installing dependencies..."); -// try { -// const installOutput = await execPromise("pnpm install", { cwd: tmpDir }); -// console.log("Install output:", installOutput.stdout); -// } catch (error) { -// console.error("Error installing dependencies:", error); -// throw error; -// } -// -// // Copy project to test directory -// console.log(`Copying project from ${projectPath} to ${tmpDir}/src/actors`); -// const projectDestDir = path.join(tmpDir, "src", "actors"); -// await fs.cp(projectPath, projectDestDir, { recursive: true }); -// -// const serverTsContent = `import { startManager } from "@rivetkit/rivet/manager"; -// import { registry } from "./actors/registry"; -// -// // TODO: Find a cleaner way of flagging an registry as test mode (ideally not in the config itself) -// // Force enable test -// registry.config.test.enabled = true; -// -// startManager(registry); -// `; -// await writeFile(tmpDir, "src/server.ts", serverTsContent); -// -// const actorTsContent = `import { createActorHandler } from "@rivetkit/actor/drivers/rivet"; -// import { registry } from "./actors/registry"; -// -// // TODO: Find a cleaner way of flagging an registry as test mode (ideally not in the config itself) -// // Force enable test -// registry.config.test.enabled = true; -// -// export default createActorHandler(registry);`; -// await writeFile(tmpDir, "src/actor.ts", actorTsContent); -// -// // Build and deploy to Rivet -// console.log("Building and deploying to Rivet..."); -// -// if (!process.env._RIVET_SKIP_DEPLOY) { -// // Deploy using the rivet CLI -// console.log("Spawning rivet deploy command..."); -// const deployProcess = spawn( -// "rivet", -// ["deploy", "--environment", environment, "--non-interactive"], -// { -// cwd: tmpDir, -// env: { -// ...process.env, -// RIVET_ENDPOINT: apiEndpoint, -// RIVET_CLOUD_TOKEN: rivetCloudToken, -// //CI: "1", -// }, -// stdio: "inherit", // Stream output directly to console -// }, -// ); -// -// console.log("Waiting for deploy process to complete..."); -// await new Promise((resolve, reject) => { -// deployProcess.on("exit", (code) => { -// if (code === 0) { -// resolve(undefined); -// } else { -// reject(new Error(`Deploy process exited with code ${code}`)); -// } -// }); -// deployProcess.on("error", (err) => { -// console.error("Deploy process error:", err); -// reject(err); -// }); -// }); -// console.log("Deploy process completed successfully"); -// } -// -// // Get the endpoint URL -// console.log("Getting Rivet endpoint..."); -// -// // // HACK: We have to get the endpoint of the actor directly since we can't route functions with hostnames on localhost yet -// // const { actors } = await rivetClient.actors.list({ -// // tagsJson: JSON.stringify({ -// // type: "function", -// // function: "manager", -// // appName, -// // }), -// // project, -// // environment, -// // }); -// // const managerActor = actors[0]; -// // invariant(managerActor, "missing manager actor"); -// // const endpoint = managerActor.network.ports.http?.url; -// // invariant(endpoint, "missing manager actor endpoint"); -// -// // TODO: This doesn't work in local dev since we can't route functions on localhost yet -// // Get the endpoint using the CLI endpoint command -// console.log("Spawning rivet function endpoint command..."); -// const endpointProcess = spawn( -// "rivet", -// ["function", "endpoint", "--environment", environment, "manager"], -// { -// cwd: tmpDir, -// env: { -// ...process.env, -// CI: "1", -// }, -// stdio: ["inherit", "pipe", "inherit"], // Capture stdout -// }, -// ); -// -// // Capture the endpoint -// let endpointOutput = ""; -// endpointProcess.stdout.on("data", (data) => { -// const output = data.toString(); -// console.log(`Endpoint output: ${output}`); -// endpointOutput += output; -// }); -// -// // Wait for endpoint command to complete -// console.log("Waiting for endpoint process to complete..."); -// await new Promise((resolve, reject) => { -// endpointProcess.on("exit", (code) => { -// console.log(`Endpoint process exited with code: ${code}`); -// if (code === 0) { -// resolve(undefined); -// } else { -// reject(new Error(`Endpoint command failed with code ${code}`)); -// } -// }); -// endpointProcess.on("error", (err) => { -// console.error("Endpoint process error:", err); -// reject(err); -// }); -// }); -// -// invariant(endpointOutput, "endpoint command returned empty output"); -// console.log(`Raw endpoint output: ${endpointOutput}`); -// -// // Look for something that looks like a URL in the string -// const lines = endpointOutput.trim().split("\n"); -// const endpoint = lines[lines.length - 1]; -// invariant(endpoint, "endpoint not found"); -// -// console.log("Manager endpoint", endpoint); -// -// console.log("=== END deployToRivet ==="); -// -// return endpoint; -// } diff --git a/packages/core/tsup.config.bundled_an4diesgzb.mjs b/packages/core/tsup.config.bundled_an4diesgzb.mjs deleted file mode 100644 index c3903cd71..000000000 --- a/packages/core/tsup.config.bundled_an4diesgzb.mjs +++ /dev/null @@ -1,20 +0,0 @@ -// ../../tsup.base.ts -var tsup_base_default = { - target: "node16", - platform: "node", - format: ["cjs", "esm"], - sourcemap: true, - clean: true, - dts: true, - minify: false, - // IMPORTANT: Splitting is required to fix a bug with ESM (https://github.com/egoist/tsup/issues/992#issuecomment-1763540165) - splitting: true, - skipNodeModulesBundle: true, - publicDir: true, -}; - -// tsup.config.ts -import { defineConfig } from "tsup"; -var tsup_config_default = defineConfig(tsup_base_default); -export { tsup_config_default as default }; -//# sourceMappingURL=data:application/json;base64,ewogICJ2ZXJzaW9uIjogMywKICAic291cmNlcyI6IFsiLi4vLi4vdHN1cC5iYXNlLnRzIiwgInRzdXAuY29uZmlnLnRzIl0sCiAgInNvdXJjZXNDb250ZW50IjogWyJjb25zdCBfX2luamVjdGVkX2ZpbGVuYW1lX18gPSBcIi9ob21lL25hdGhhbi9yaXZldGtpdC90c3VwLmJhc2UudHNcIjtjb25zdCBfX2luamVjdGVkX2Rpcm5hbWVfXyA9IFwiL2hvbWUvbmF0aGFuL3JpdmV0a2l0XCI7Y29uc3QgX19pbmplY3RlZF9pbXBvcnRfbWV0YV91cmxfXyA9IFwiZmlsZTovLy9ob21lL25hdGhhbi9yaXZldGtpdC90c3VwLmJhc2UudHNcIjtpbXBvcnQgdHlwZSB7IE9wdGlvbnMgfSBmcm9tIFwidHN1cFwiO1xuXG5leHBvcnQgZGVmYXVsdCB7XG5cdHRhcmdldDogXCJub2RlMTZcIixcblx0cGxhdGZvcm06IFwibm9kZVwiLFxuXHRmb3JtYXQ6IFtcImNqc1wiLCBcImVzbVwiXSxcblx0c291cmNlbWFwOiB0cnVlLFxuXHRjbGVhbjogdHJ1ZSxcblx0ZHRzOiB0cnVlLFxuXHRtaW5pZnk6IGZhbHNlLFxuXHQvLyBJTVBPUlRBTlQ6IFNwbGl0dGluZyBpcyByZXF1aXJlZCB0byBmaXggYSBidWcgd2l0aCBFU00gKGh0dHBzOi8vZ2l0aHViLmNvbS9lZ29pc3QvdHN1cC9pc3N1ZXMvOTkyI2lzc3VlY29tbWVudC0xNzYzNTQwMTY1KVxuXHRzcGxpdHRpbmc6IHRydWUsXG5cdHNraXBOb2RlTW9kdWxlc0J1bmRsZTogdHJ1ZSxcblx0cHVibGljRGlyOiB0cnVlLFxufSBzYXRpc2ZpZXMgT3B0aW9ucztcbiIsICJjb25zdCBfX2luamVjdGVkX2ZpbGVuYW1lX18gPSBcIi9ob21lL25hdGhhbi9yaXZldGtpdC9wYWNrYWdlcy9jb3JlL3RzdXAuY29uZmlnLnRzXCI7Y29uc3QgX19pbmplY3RlZF9kaXJuYW1lX18gPSBcIi9ob21lL25hdGhhbi9yaXZldGtpdC9wYWNrYWdlcy9jb3JlXCI7Y29uc3QgX19pbmplY3RlZF9pbXBvcnRfbWV0YV91cmxfXyA9IFwiZmlsZTovLy9ob21lL25hdGhhbi9yaXZldGtpdC9wYWNrYWdlcy9jb3JlL3RzdXAuY29uZmlnLnRzXCI7aW1wb3J0IGRlZmF1bHRDb25maWcgZnJvbSBcIi4uLy4uL3RzdXAuYmFzZS50c1wiO1xuaW1wb3J0IHsgZGVmaW5lQ29uZmlnIH0gZnJvbSBcInRzdXBcIjtcblxuZXhwb3J0IGRlZmF1bHQgZGVmaW5lQ29uZmlnKGRlZmF1bHRDb25maWcpO1xuIl0sCiAgIm1hcHBpbmdzIjogIjtBQUVBLElBQU8sb0JBQVE7QUFBQSxFQUNkLFFBQVE7QUFBQSxFQUNSLFVBQVU7QUFBQSxFQUNWLFFBQVEsQ0FBQyxPQUFPLEtBQUs7QUFBQSxFQUNyQixXQUFXO0FBQUEsRUFDWCxPQUFPO0FBQUEsRUFDUCxLQUFLO0FBQUEsRUFDTCxRQUFRO0FBQUE7QUFBQSxFQUVSLFdBQVc7QUFBQSxFQUNYLHVCQUF1QjtBQUFBLEVBQ3ZCLFdBQVc7QUFDWjs7O0FDYkEsU0FBUyxvQkFBb0I7QUFFN0IsSUFBTyxzQkFBUSxhQUFhLGlCQUFhOyIsCiAgIm5hbWVzIjogW10KfQo= diff --git a/packages/core/tsup.config.bundled_xvi1jgwbzx.mjs b/packages/core/tsup.config.bundled_xvi1jgwbzx.mjs deleted file mode 100644 index 4b87c09d0..000000000 --- a/packages/core/tsup.config.bundled_xvi1jgwbzx.mjs +++ /dev/null @@ -1,20 +0,0 @@ -// ../../tsup.base.ts -var tsup_base_default = { - target: "node16", - platform: "node", - format: ["cjs", "esm"], - sourcemap: true, - clean: true, - dts: true, - minify: false, - // IMPORTANT: Splitting is required to fix a bug with ESM (https://github.com/egoist/tsup/issues/992#issuecomment-1763540165) - splitting: true, - skipNodeModulesBundle: true, - publicDir: true, -}; - -// tsup.config.ts -import { defineConfig } from "tsup"; -var tsup_config_default = defineConfig(tsup_base_default); -export { tsup_config_default as default }; -//# sourceMappingURL=data:application/json;base64,ewogICJ2ZXJzaW9uIjogMywKICAic291cmNlcyI6IFsiLi4vLi4vdHN1cC5iYXNlLnRzIiwgInRzdXAuY29uZmlnLnRzIl0sCiAgInNvdXJjZXNDb250ZW50IjogWyJjb25zdCBfX2luamVjdGVkX2ZpbGVuYW1lX18gPSBcIi9Vc2Vycy9uYXRoYW4vcml2ZXQvYWN0b3ItY29yZS90c3VwLmJhc2UudHNcIjtjb25zdCBfX2luamVjdGVkX2Rpcm5hbWVfXyA9IFwiL1VzZXJzL25hdGhhbi9yaXZldC9hY3Rvci1jb3JlXCI7Y29uc3QgX19pbmplY3RlZF9pbXBvcnRfbWV0YV91cmxfXyA9IFwiZmlsZTovLy9Vc2Vycy9uYXRoYW4vcml2ZXQvYWN0b3ItY29yZS90c3VwLmJhc2UudHNcIjtpbXBvcnQgdHlwZSB7IE9wdGlvbnMgfSBmcm9tIFwidHN1cFwiO1xuXG5leHBvcnQgZGVmYXVsdCB7XG5cdHRhcmdldDogXCJub2RlMTZcIixcblx0cGxhdGZvcm06IFwibm9kZVwiLFxuXHRmb3JtYXQ6IFtcImNqc1wiLCBcImVzbVwiXSxcblx0c291cmNlbWFwOiB0cnVlLFxuXHRjbGVhbjogdHJ1ZSxcblx0ZHRzOiB0cnVlLFxuXHRtaW5pZnk6IGZhbHNlLFxuXHQvLyBJTVBPUlRBTlQ6IFNwbGl0dGluZyBpcyByZXF1aXJlZCB0byBmaXggYSBidWcgd2l0aCBFU00gKGh0dHBzOi8vZ2l0aHViLmNvbS9lZ29pc3QvdHN1cC9pc3N1ZXMvOTkyI2lzc3VlY29tbWVudC0xNzYzNTQwMTY1KVxuXHRzcGxpdHRpbmc6IHRydWUsXG5cdHNraXBOb2RlTW9kdWxlc0J1bmRsZTogdHJ1ZSxcblx0cHVibGljRGlyOiB0cnVlLFxufSBzYXRpc2ZpZXMgT3B0aW9ucztcbiIsICJjb25zdCBfX2luamVjdGVkX2ZpbGVuYW1lX18gPSBcIi9Vc2Vycy9uYXRoYW4vcml2ZXQvYWN0b3ItY29yZS9wYWNrYWdlcy9hY3Rvci1jb3JlL3RzdXAuY29uZmlnLnRzXCI7Y29uc3QgX19pbmplY3RlZF9kaXJuYW1lX18gPSBcIi9Vc2Vycy9uYXRoYW4vcml2ZXQvYWN0b3ItY29yZS9wYWNrYWdlcy9hY3Rvci1jb3JlXCI7Y29uc3QgX19pbmplY3RlZF9pbXBvcnRfbWV0YV91cmxfXyA9IFwiZmlsZTovLy9Vc2Vycy9uYXRoYW4vcml2ZXQvYWN0b3ItY29yZS9wYWNrYWdlcy9hY3Rvci1jb3JlL3RzdXAuY29uZmlnLnRzXCI7aW1wb3J0IGRlZmF1bHRDb25maWcgZnJvbSBcIi4uLy4uL3RzdXAuYmFzZS50c1wiO1xuaW1wb3J0IHsgZGVmaW5lQ29uZmlnIH0gZnJvbSBcInRzdXBcIjtcblxuZXhwb3J0IGRlZmF1bHQgZGVmaW5lQ29uZmlnKGRlZmF1bHRDb25maWcpO1xuIl0sCiAgIm1hcHBpbmdzIjogIjtBQUVBLElBQU8sb0JBQVE7QUFBQSxFQUNkLFFBQVE7QUFBQSxFQUNSLFVBQVU7QUFBQSxFQUNWLFFBQVEsQ0FBQyxPQUFPLEtBQUs7QUFBQSxFQUNyQixXQUFXO0FBQUEsRUFDWCxPQUFPO0FBQUEsRUFDUCxLQUFLO0FBQUEsRUFDTCxRQUFRO0FBQUE7QUFBQSxFQUVSLFdBQVc7QUFBQSxFQUNYLHVCQUF1QjtBQUFBLEVBQ3ZCLFdBQVc7QUFDWjs7O0FDYkEsU0FBUyxvQkFBb0I7QUFFN0IsSUFBTyxzQkFBUSxhQUFhLGlCQUFhOyIsCiAgIm5hbWVzIjogW10KfQo= diff --git a/packages/core/tsup.config.ts b/packages/core/tsup.config.ts index b935a0a8e..e6b91b79c 100644 --- a/packages/core/tsup.config.ts +++ b/packages/core/tsup.config.ts @@ -1,4 +1,7 @@ import { defineConfig } from "tsup"; import defaultConfig from "../../tsup.base.ts"; -export default defineConfig(defaultConfig); +export default defineConfig({ + ...defaultConfig, + noExternal: ["@rivetkit/engine-runner", "@rivetkit/engine-runner-protocol"], +}); diff --git a/packages/drivers/cloudflare-workers/src/manager-driver.ts b/packages/drivers/cloudflare-workers/src/manager-driver.ts index 49d02038a..eaa8012f1 100644 --- a/packages/drivers/cloudflare-workers/src/manager-driver.ts +++ b/packages/drivers/cloudflare-workers/src/manager-driver.ts @@ -164,7 +164,6 @@ export class CloudflareActorsManagerDriver implements ManagerDriver { }); } - // TODO: strip headers const newUrl = new URL(`http://actor${path}`); const actorRequest = new Request(newUrl, c.req.raw); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 3af8acf96..347e50710 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -679,7 +679,7 @@ importers: version: 5.4.19(@types/node@22.15.32) vitest: specifier: ^3.1.1 - version: 3.2.4(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) + version: 3.2.4(@types/node@22.15.32)(@vitest/ui@3.1.1)(tsx@4.20.3)(yaml@2.8.0) examples/raw-websocket-handler-proxy: dependencies: @@ -722,7 +722,7 @@ importers: version: 6.3.5(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) vitest: specifier: ^3.1.1 - version: 3.2.4(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) + version: 3.2.4(@types/node@22.15.32)(@vitest/ui@3.1.1)(tsx@4.20.3)(yaml@2.8.0) examples/react: dependencies: @@ -1037,14 +1037,17 @@ importers: version: 3.25.76 devDependencies: '@hono/node-server': - specifier: ^1.14.0 - version: 1.14.4(hono@4.8.3) + specifier: ^1.18.2 + version: 1.18.2(hono@4.8.3) '@hono/node-ws': specifier: ^1.1.1 - version: 1.1.7(@hono/node-server@1.14.4(hono@4.8.3))(hono@4.8.3) + version: 1.1.7(@hono/node-server@1.18.2(hono@4.8.3))(hono@4.8.3) '@rivet-gg/actor-core': specifier: ^25.1.0 version: 25.2.0 + '@rivetkit/engine-runner': + specifier: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472 + version: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472(hono@4.8.3) '@types/invariant': specifier: ^2 version: 2.2.37 @@ -1191,7 +1194,7 @@ importers: version: 5.8.3 vitest: specifier: ^3.1.1 - version: 3.2.4(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) + version: 3.2.4(@types/node@22.15.32)(@vitest/ui@3.1.1)(tsx@4.20.3)(yaml@2.8.0) packages/frameworks/framework-base: dependencies: @@ -1427,6 +1430,10 @@ packages: resolution: {integrity: sha512-ETyHEk2VHHvl9b9jZP5IHPavHYk57EhanlRRuae9XCpb/j5bDCbPPMOBfCWhnl/7EDJz0jEMCi/RhccCE8r1+Q==} engines: {node: '>=6.9.0'} + '@bare-ts/lib@0.4.0': + resolution: {integrity: sha512-uTb12lcDkvwwwGb/4atNy/+2Xuksx88tlxKuwnDuxmsOFVo4R/WsE+fmAf2nWtzHDi7m6NDw8Ke9j+XlNBLoJg==} + engines: {node: ^14.18.0 || >=16.0.0} + '@better-auth/utils@0.2.5': resolution: {integrity: sha512-uI2+/8h/zVsH8RrYdG8eUErbuGBk16rZKQfz8CjxQOyCE6v7BqFYEbFwvOkvl1KbUdxhqOnXp78+uE5h8qVEgQ==} @@ -2166,6 +2173,12 @@ packages: peerDependencies: hono: 4.8.3 + '@hono/node-server@1.18.2': + resolution: {integrity: sha512-icgNvC0vRYivzyuSSaUv9ttcwtN8fDyd1k3AOIBDJgYd84tXRZSS6na8X54CY/oYoFTNhEmZraW/Rb9XYwX4KA==} + engines: {node: '>=18.14.1'} + peerDependencies: + hono: 4.8.3 + '@hono/node-ws@1.1.7': resolution: {integrity: sha512-O+sJXeruHlAAdSHUoyol8s2kTVqixdGIfKqakyN6z2RKR2O8LDOFr1CpTye7xfwmUsBe6uoXKSLmGvLbCfPFgw==} engines: {node: '>=18.14.1'} @@ -2565,6 +2578,14 @@ packages: '@rivet-gg/actor-core@25.2.0': resolution: {integrity: sha512-4K72XcDLVAz44Ae6G6GuyzWyxQZOLN8jM/W+sVKm6fHr70X8FNCSC5+/9hFIxz/OH9E6q6Wi3V/UN/k6immUBQ==} + '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@a029157d5d2a679b5bd5d619942a494477154822': + resolution: {tarball: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@a029157d5d2a679b5bd5d619942a494477154822} + version: 1.0.0 + + '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472': + resolution: {tarball: https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472} + version: 0.0.0 + '@rivetkit/fast-json-patch@3.1.2': resolution: {integrity: sha512-CtA50xgsSSzICQduF/NDShPRzvucnNvsW/lQO0WgMTT1XAj9Lfae4pm7r3llFwilgG+9iq76Hv1LUqNy72v6yw==} @@ -4442,6 +4463,7 @@ packages: source-map@0.8.0-beta.0: resolution: {integrity: sha512-2ymg6oRBpebeZi9UUNsgQ89bhx01TcTkmNTGnNO88imTmbSgy4nfujrgVEFKWpMTEGA11EDkTt7mqObTPdigIA==} engines: {node: '>= 8'} + deprecated: The work that was done in this beta branch won't be included in future versions spawn-command@0.0.2: resolution: {integrity: sha512-zC8zGoGkmc8J9ndvml8Xksr1Amk9qBujgbF0JAIWO7kXr43w0h/0GJNM/Vustixu+YE8N/MTrQ7N31FvHUACxQ==} @@ -4953,6 +4975,18 @@ packages: utf-8-validate: optional: true + ws@8.18.3: + resolution: {integrity: sha512-PEIGCY5tSlUt50cqyMXfCzX+oOPqN0vuGqWzbcJ2xvnkzkq46oOpz7dQaTDBdfICb4N14+GARUDw2XV2N4tvzg==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + y18n@5.0.8: resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} engines: {node: '>=10'} @@ -5172,6 +5206,8 @@ snapshots: '@babel/helper-string-parser': 7.27.1 '@babel/helper-validator-identifier': 7.27.1 + '@bare-ts/lib@0.4.0': {} + '@better-auth/utils@0.2.5': dependencies: typescript: 5.8.3 @@ -5578,6 +5614,10 @@ snapshots: dependencies: hono: 4.8.3 + '@hono/node-server@1.18.2(hono@4.8.3)': + dependencies: + hono: 4.8.3 + '@hono/node-ws@1.1.7(@hono/node-server@1.14.4(hono@4.8.3))(hono@4.8.3)': dependencies: '@hono/node-server': 1.14.4(hono@4.8.3) @@ -5587,6 +5627,15 @@ snapshots: - bufferutil - utf-8-validate + '@hono/node-ws@1.1.7(@hono/node-server@1.18.2(hono@4.8.3))(hono@4.8.3)': + dependencies: + '@hono/node-server': 1.18.2(hono@4.8.3) + hono: 4.8.3 + ws: 8.18.2 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + '@hono/standard-validator@0.1.4(@standard-schema/spec@1.0.0)(hono@4.8.3)': dependencies: '@standard-schema/spec': 1.0.0 @@ -5952,6 +6001,20 @@ snapshots: dependencies: zod: 3.25.76 + '@rivetkit/engine-runner-protocol@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@a029157d5d2a679b5bd5d619942a494477154822': + dependencies: + '@bare-ts/lib': 0.4.0 + + '@rivetkit/engine-runner@https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner@472(hono@4.8.3)': + dependencies: + '@hono/node-server': 1.18.2(hono@4.8.3) + '@rivetkit/engine-runner-protocol': https://pkg.pr.new/rivet-gg/engine/@rivetkit/engine-runner-protocol@a029157d5d2a679b5bd5d619942a494477154822 + ws: 8.18.3 + transitivePeerDependencies: + - bufferutil + - hono + - utf-8-validate + '@rivetkit/fast-json-patch@3.1.2': {} '@rolldown/pluginutils@1.0.0-beta.11': {} @@ -8533,7 +8596,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@6.3.5(@types/node@24.0.4)(tsx@4.20.3)(yaml@2.8.0)) + '@vitest/mocker': 3.2.4(vite@6.3.5(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4 @@ -8612,47 +8675,6 @@ snapshots: - tsx - yaml - vitest@3.2.4(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0): - dependencies: - '@types/chai': 5.2.2 - '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@6.3.5(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0)) - '@vitest/pretty-format': 3.2.4 - '@vitest/runner': 3.2.4 - '@vitest/snapshot': 3.2.4 - '@vitest/spy': 3.2.4 - '@vitest/utils': 3.2.4 - chai: 5.2.0 - debug: 4.4.1 - expect-type: 1.2.1 - magic-string: 0.30.17 - pathe: 2.0.3 - picomatch: 4.0.2 - std-env: 3.9.0 - tinybench: 2.9.0 - tinyexec: 0.3.2 - tinyglobby: 0.2.14 - tinypool: 1.1.1 - tinyrainbow: 2.0.0 - vite: 6.3.5(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) - vite-node: 3.2.4(@types/node@22.15.32)(tsx@4.20.3)(yaml@2.8.0) - why-is-node-running: 2.3.0 - optionalDependencies: - '@types/node': 22.15.32 - transitivePeerDependencies: - - jiti - - less - - lightningcss - - msw - - sass - - sass-embedded - - stylus - - sugarss - - supports-color - - terser - - tsx - - yaml - vitest@3.2.4(@types/node@24.0.4)(tsx@4.20.3)(yaml@2.8.0): dependencies: '@types/chai': 5.2.2 @@ -8764,6 +8786,8 @@ snapshots: ws@8.18.2: {} + ws@8.18.3: {} + y18n@5.0.8: {} yallist@3.1.1: {} diff --git a/vitest.base.ts b/vitest.base.ts index 885fc63a4..885c2d6ca 100644 --- a/vitest.base.ts +++ b/vitest.base.ts @@ -2,8 +2,8 @@ import type { ViteUserConfig } from "vitest/config"; export default { test: { - testTimeout: 2_000, - hookTimeout: 2_000, + testTimeout: 5_000, + hookTimeout: 5_000, // Enable parallelism sequence: { // TODO: This breaks fake timers, unsure how to make tests run in parallel within the same file