diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 0d97802..1664a30 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -57,7 +57,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -105,7 +105,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -136,7 +136,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -203,7 +203,7 @@ export declare const components: { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/client/step.ts b/src/client/step.ts index 0e8a837..b05f7df 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -26,10 +26,11 @@ export type WorkerResult = export type StepRequest = { name: string; functionType: FunctionType; - function: FunctionReference; + function: FunctionReference | undefined; args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; + pause: boolean | undefined; resolve: (result: unknown) => void; reject: (error: unknown) => void; @@ -135,9 +136,12 @@ export class StepExecutor { inProgress: true, name: message.name, functionType: message.functionType, - handle: await createFunctionHandle(message.function), + handle: message.function + ? await createFunctionHandle(message.function) + : "", args: message.args, argsSize: valueSize(message.args as Value), + pause: message.pause, outcome: undefined, startedAt: this.now, completedAt: undefined, diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8145672..ff87ea4 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -4,12 +4,14 @@ import type { FunctionArgs, FunctionReturnType, FunctionType, + DefaultFunctionArgs, } from "convex/server"; import { safeFunctionName } from "./safeFunctionName.js"; import type { StepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; import type { RunOptions, WorkflowStep } from "./types.js"; import type { WorkflowId } from "../types.js"; +import type { Validator } from "convex/values"; export class StepContext implements WorkflowStep { constructor( @@ -41,30 +43,73 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } - private async runFunction< - F extends FunctionReference, + async pause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, >( + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise { + if (opts?.onPause) { + return this.runFunction("mutation", opts.onPause, opts.args, { + name: opts.name, + pause: true, + }) as Promise; + } else { + return this.run({ + name: opts?.name ?? "pause", + functionType: "mutation", + function: undefined, + args: {}, + retry: undefined, + pause: true, + schedulerOptions: {}, + }) as Promise; + } + } + + private runFunction>( functionType: FunctionType, f: F, args: unknown, - opts?: RunOptions & RetryOption, + opts?: RunOptions & RetryOption & { pause?: true }, ): Promise { - let send: unknown; - const { name, ...rest } = opts ?? {}; - const { retry, ...schedulerOptions } = rest; + const { name, retry, pause, ...schedulerOptions } = opts ?? {}; + return this.run({ + name: name ?? (f ? safeFunctionName(f) : "pause"), + functionType, + function: f, + args: args ?? {}, + retry, + pause, + schedulerOptions, + }); + } + + private run(req: Omit): Promise { + let send: Promise; const p = new Promise((resolve, reject) => { send = this.sender.push({ - name: name ?? safeFunctionName(f), - functionType, - function: f, - args, - retry, - schedulerOptions, + ...req, resolve, reject, }); }); - await send; - return p; + return send!.then(() => p); } } diff --git a/src/client/types.ts b/src/client/types.ts index 9e90065..04aac99 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,12 +1,13 @@ import type { RetryOption, WorkId } from "@convex-dev/workpool"; import type { + DefaultFunctionArgs, Expand, FunctionArgs, FunctionReference, FunctionReturnType, } from "convex/server"; import type { api } from "../component/_generated/api.js"; -import type { GenericId } from "convex/values"; +import type { GenericId, Validator } from "convex/values"; import type { WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; @@ -81,6 +82,42 @@ export type WorkflowStep = { args: FunctionArgs, opts?: RunOptions & RetryOption, ): Promise>; + + /** + * Pause the workflow, to be resumed asynchronously. + * + * It will be marked as paused in the same transaction as the pause handler + * is called. The pause handler must receive the arguments and return nothing. + * + * The return value is the value provided by the resume call, which must match + * the return validator provided. + * + * @param pauseHandler - The pause handler to run, like `internal.index.examplePause`. + * @param args - The arguments to the pause handler. + * @param opts - Options for retrying, scheduling and naming the pause. + */ + pause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, + >( + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise; }; export type UseApi = Expand<{ diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 2caf60c..36426a4 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -51,7 +51,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -99,7 +99,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -130,7 +130,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; @@ -197,7 +197,7 @@ export type Mounts = { args: any; argsSize: number; completedAt?: number; - functionType: "query" | "mutation" | "action"; + functionType: "query" | "mutation" | "action" | "pause"; handle: string; inProgress: boolean; name: string; diff --git a/src/component/journal.ts b/src/component/journal.ts index df9642e..25d7c8f 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -104,7 +104,8 @@ export const startSteps = mutation({ generationNumber, stepId, }; - let workId: WorkId; + let workId: WorkId | undefined = undefined; + // TODO: use enqueueBatch switch (step.functionType) { case "query": { workId = await workpool.enqueueQuery( @@ -115,6 +116,12 @@ export const startSteps = mutation({ ); break; } + // Pause is a special mutation + case "pause": + if (!handle) { + break; + } + // fallthrough case "mutation": { workId = await workpool.enqueueMutation( ctx, @@ -134,8 +141,10 @@ export const startSteps = mutation({ break; } } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); + if (workId) { + entry.step.workId = workId; + await ctx.db.replace(entry._id, entry); + } console.event("started", { workflowId: workflow._id, diff --git a/src/component/pool.ts b/src/component/pool.ts index 781ad68..334cd35 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -114,6 +114,18 @@ export const onComplete = internalMutation({ ); return; } + if ( + journalEntry.step.functionType === "pause" && + args.result.kind === "success" + ) { + console.event("stepPaused", { + workflowId, + workflowName: workflow.name, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + }); + return; + } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); switch (args.result.kind) { diff --git a/src/component/schema.ts b/src/component/schema.ts index c96c0af..a863d6c 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -63,7 +63,7 @@ export const step = v.object({ name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), - functionType: literals("query", "mutation", "action"), + functionType: literals("query", "mutation", "action", "pause"), handle: v.string(), argsSize: v.number(), args: v.any(),