diff --git a/example/convex/_generated/api.d.ts b/example/convex/_generated/api.d.ts index 0d97802..ea05f8f 100644 --- a/example/convex/_generated/api.d.ts +++ b/example/convex/_generated/api.d.ts @@ -53,21 +53,38 @@ export declare const components: { journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -91,6 +108,17 @@ export declare const components: { }; } >; + resume: FunctionReference< + "mutation", + "internal", + { + name?: string; + resumeValue: any; + workflowHandle: string; + workflowId: string; + }, + null + >; startSteps: FunctionReference< "mutation", "internal", @@ -101,21 +129,38 @@ export declare const components: { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -132,21 +177,38 @@ export declare const components: { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -199,21 +261,38 @@ export declare const components: { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; diff --git a/example/convex/setup.test.ts b/example/convex/setup.test.ts index a1a4451..50f1287 100644 --- a/example/convex/setup.test.ts +++ b/example/convex/setup.test.ts @@ -5,7 +5,7 @@ import schema from "./schema"; export const modules = import.meta.glob("./**/*.*s"); // Sorry about everything -import componentSchema from "../../node_modules/@convex-dev/workflow/src/component/schema"; +import componentSchema from "../../src/component/schema"; export { componentSchema }; export const componentModules = import.meta.glob( "../../node_modules/@convex-dev/workflow/src/component/**/*.ts", diff --git a/src/client/index.ts b/src/client/index.ts index 2cc5c5b..2978921 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -204,6 +204,38 @@ export class WorkflowManager { workflowId, }); } + + /** + * Resume a paused workflow with a type-safe value. + * + * @param ctx - The Convex context. + * @param workflow - The workflow function reference for type safety. + * @param workflowId - The workflow ID. + * @param resumeValue - The value to pass to the paused step. + * @param opts - Options including the validator for type inference and optional step name. + */ + async resume< + F extends FunctionReference<"mutation", "internal">, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + V extends Validator, + >( + ctx: RunMutationCtx, + workflow: F, + workflowId: WorkflowId, + resumeValue: unknown, + opts?: { + returns?: V; + name?: string; + }, + ): Promise { + const handle = await createFunctionHandle(workflow); + await ctx.runMutation(this.component.journal.resume, { + workflowHandle: handle, + workflowId, + resumeValue, + name: opts?.name, + }); + } } type RunQueryCtx = { diff --git a/src/client/step.ts b/src/client/step.ts index 0e8a837..79b3902 100644 --- a/src/client/step.ts +++ b/src/client/step.ts @@ -23,18 +23,30 @@ export type WorkerResult = | { type: "handlerDone"; runResult: RunResult } | { type: "executorBlocked" }; -export type StepRequest = { +export type ExecutionStepRequest = { + type: "execution"; name: string; functionType: FunctionType; function: FunctionReference; args: unknown; retry: RetryBehavior | boolean | undefined; schedulerOptions: SchedulerOptions; + resolve: (result: unknown) => void; + reject: (error: unknown) => void; +}; +export type PauseStepRequest = { + type: "pause"; + name: string; + onPauseFunction?: FunctionReference<"mutation", "internal">; + args: unknown; + schedulerOptions: SchedulerOptions; resolve: (result: unknown) => void; reject: (error: unknown) => void; }; +export type StepRequest = ExecutionStepRequest | PauseStepRequest; + const MAX_JOURNAL_SIZE = 8 << 20; export class StepExecutor { @@ -131,22 +143,44 @@ export class StepExecutor { async startSteps(messages: StepRequest[]): Promise { const steps = await Promise.all( messages.map(async (message) => { - const step = { - inProgress: true, - name: message.name, - functionType: message.functionType, - handle: await createFunctionHandle(message.function), - args: message.args, - argsSize: valueSize(message.args as Value), - outcome: undefined, - startedAt: this.now, - completedAt: undefined, - }; - return { - retry: message.retry, - schedulerOptions: message.schedulerOptions, - step, - }; + if (message.type === "execution") { + const step = { + type: "execution" as const, + inProgress: true, + name: message.name, + functionType: message.functionType, + handle: await createFunctionHandle(message.function), + args: message.args, + argsSize: valueSize(message.args as Value), + runResult: undefined, + startedAt: this.now, + completedAt: undefined, + }; + return { + retry: message.retry, + schedulerOptions: message.schedulerOptions, + step, + }; + } else { + const step = { + type: "pause" as const, + inProgress: true, + name: message.name, + onPauseHandle: message.onPauseFunction + ? await createFunctionHandle(message.onPauseFunction) + : undefined, + args: message.args, + argsSize: valueSize(message.args as Value), + runResult: undefined, + startedAt: this.now, + completedAt: undefined, + }; + return { + retry: undefined, + schedulerOptions: message.schedulerOptions, + step, + }; + } }), ); const entries = (await this.ctx.runMutation( diff --git a/src/client/stepContext.ts b/src/client/stepContext.ts index 8145672..b102f23 100644 --- a/src/client/stepContext.ts +++ b/src/client/stepContext.ts @@ -4,9 +4,11 @@ import type { FunctionArgs, FunctionReturnType, FunctionType, + DefaultFunctionArgs, } from "convex/server"; +import type { Validator } from "convex/values"; import { safeFunctionName } from "./safeFunctionName.js"; -import type { StepRequest } from "./step.js"; +import type { StepRequest, ExecutionStepRequest } from "./step.js"; import type { RetryOption } from "@convex-dev/workpool"; import type { RunOptions, WorkflowStep } from "./types.js"; import type { WorkflowId } from "../types.js"; @@ -17,7 +19,7 @@ export class StepContext implements WorkflowStep { private sender: BaseChannel, ) {} - async runQuery>( + runQuery>( query: Query, args: FunctionArgs, opts?: RunOptions, @@ -25,7 +27,7 @@ export class StepContext implements WorkflowStep { return this.runFunction("query", query, args, opts); } - async runMutation>( + runMutation>( mutation: Mutation, args: FunctionArgs, opts?: RunOptions, @@ -33,7 +35,7 @@ export class StepContext implements WorkflowStep { return this.runFunction("mutation", mutation, args, opts); } - async runAction>( + runAction>( action: Action, args: FunctionArgs, opts?: RunOptions & RetryOption, @@ -41,25 +43,89 @@ export class StepContext implements WorkflowStep { return this.runFunction("action", action, args, opts); } - private async runFunction< - F extends FunctionReference, + pause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, >( + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise { + return this.runPause(opts); + } + + private runFunction>( functionType: FunctionType, f: F, args: unknown, opts?: RunOptions & RetryOption, + ): Promise { + const { name, retry, ...schedulerOptions } = opts ?? {}; + return this.runExecution({ + type: "execution", + name: name ?? safeFunctionName(f), + functionType, + function: f, + args: args ?? {}, + retry, + schedulerOptions, + }); + } + + private runPause< + Mutation extends FunctionReference< + "mutation", + "internal", + DefaultFunctionArgs, + void + >, + Returns = unknown, + >( + opts?: { + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise { + let send: unknown; + const p = new Promise((resolve, reject) => { + send = this.sender.push({ + type: "pause" as const, + name: opts?.name ?? (opts?.onPause ? safeFunctionName(opts.onPause) : "pause"), + onPauseFunction: opts?.onPause, + args: opts?.args ?? {}, + schedulerOptions: {}, + resolve: resolve as (result: unknown) => void, + reject, + }); + }); + void send; + return p; + } + + private async runExecution( + req: Omit, ): Promise { let send: unknown; - const { name, ...rest } = opts ?? {}; - const { retry, ...schedulerOptions } = rest; const p = new Promise((resolve, reject) => { send = this.sender.push({ - name: name ?? safeFunctionName(f), - functionType, - function: f, - args, - retry, - schedulerOptions, + ...req, resolve, reject, }); diff --git a/src/client/types.ts b/src/client/types.ts index 9e90065..bccd77f 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -6,7 +6,7 @@ import type { FunctionReturnType, } from "convex/server"; import type { api } from "../component/_generated/api.js"; -import type { GenericId } from "convex/values"; +import type { GenericId, Validator } from "convex/values"; import type { WorkflowId } from "../types.js"; export type WorkflowComponent = UseApi; @@ -81,6 +81,40 @@ export type WorkflowStep = { args: FunctionArgs, opts?: RunOptions & RetryOption, ): Promise>; + + /** + * Pause the workflow, to be resumed asynchronously. + * + * It will be marked as paused in the same transaction as the pause handler + * is called. The pause handler must receive the arguments and return nothing. + * + * The return value is the value provided by the resume call, which must match + * the return validator provided. + * + * @param pauseHandler - The pause handler to run, like `internal.index.examplePause`. + * @param args - The arguments to the pause handler. + * @param opts - Options for retrying, scheduling and naming the pause. + */ + pause< + Mutation extends FunctionReference< + "mutation", + "internal" + >, + Returns = unknown, + >( + opts?: { + /** + * The name for the pause. By default, if you pass in api.foo.bar.baz, + * it will use "foo/bar:baz" as the name. If you pass in a function handle, + * it will use the function handle directly. Otherwise it will use "pause". + */ + name?: string; + returns: Validator; + } & ( + | { onPause: Mutation; args: FunctionArgs } + | { onPause?: undefined; args?: undefined } + ), + ): Promise; }; export type UseApi = Expand<{ diff --git a/src/component/_generated/api.d.ts b/src/component/_generated/api.d.ts index 2caf60c..de60b69 100644 --- a/src/component/_generated/api.d.ts +++ b/src/component/_generated/api.d.ts @@ -47,21 +47,38 @@ export type Mounts = { journalEntries: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; @@ -85,6 +102,17 @@ export type Mounts = { }; } >; + resume: FunctionReference< + "mutation", + "public", + { + name?: string; + resumeValue: any; + workflowHandle: string; + workflowId: string; + }, + null + >; startSteps: FunctionReference< "mutation", "public", @@ -95,21 +123,38 @@ export type Mounts = { | boolean | { base: number; initialBackoffMs: number; maxAttempts: number }; schedulerOptions?: { runAt?: number } | { runAfter?: number }; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; }>; workflowId: string; workpoolOptions?: { @@ -126,21 +171,38 @@ export type Mounts = { Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }> @@ -193,21 +255,38 @@ export type Mounts = { inProgress: Array<{ _creationTime: number; _id: string; - step: { - args: any; - argsSize: number; - completedAt?: number; - functionType: "query" | "mutation" | "action"; - handle: string; - inProgress: boolean; - name: string; - runResult?: - | { kind: "success"; returnValue: any } - | { error: string; kind: "failed" } - | { kind: "canceled" }; - startedAt: number; - workId?: string; - }; + step: + | { + args: any; + argsSize: number; + completedAt?: number; + functionType: "query" | "mutation" | "action"; + handle: string; + inProgress: boolean; + name: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "execution"; + workId?: string; + } + | { + args: any; + argsSize: number; + completedAt?: number; + inProgress: boolean; + name: string; + onPauseHandle?: string; + runResult?: + | { kind: "success"; returnValue: any } + | { error: string; kind: "failed" } + | { kind: "canceled" }; + startedAt: number; + type: "pause"; + workId?: string; + }; stepNumber: number; workflowId: string; }>; diff --git a/src/component/journal.ts b/src/component/journal.ts index df9642e..fc31be7 100644 --- a/src/component/journal.ts +++ b/src/component/journal.ts @@ -91,7 +91,6 @@ export const startSteps = mutation({ const entries = await Promise.all( args.steps.map(async (stepArgs, index) => { const { step, retry, schedulerOptions } = stepArgs; - const { name, handle, args } = step; const stepNumber = stepNumberBase + index; const stepId = await ctx.db.insert("steps", { workflowId: workflow._id, @@ -104,43 +103,58 @@ export const startSteps = mutation({ generationNumber, stepId, }; - let workId: WorkId; - switch (step.functionType) { - case "query": { - workId = await workpool.enqueueQuery( - ctx, - handle as FunctionHandle<"query">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; + let workId: WorkId | undefined = undefined; + + if (step.type === "execution") { + switch (step.functionType) { + case "query": { + workId = await workpool.enqueueQuery( + ctx, + step.handle as FunctionHandle<"query">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, + ); + break; + } + case "mutation": { + workId = await workpool.enqueueMutation( + ctx, + step.handle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, + ); + break; + } + case "action": { + workId = await workpool.enqueueAction( + ctx, + step.handle as FunctionHandle<"action">, + step.args, + { context, onComplete, name: step.name, retry, ...schedulerOptions }, + ); + break; + } } - case "mutation": { + } else if (step.type === "pause") { + if (step.onPauseHandle) { workId = await workpool.enqueueMutation( ctx, - handle as FunctionHandle<"mutation">, - args, - { context, onComplete, name, ...schedulerOptions }, - ); - break; - } - case "action": { - workId = await workpool.enqueueAction( - ctx, - handle as FunctionHandle<"action">, - args, - { context, onComplete, name, retry, ...schedulerOptions }, + step.onPauseHandle as FunctionHandle<"mutation">, + step.args, + { context, onComplete, name: step.name, ...schedulerOptions }, ); - break; } } - entry.step.workId = workId; - await ctx.db.replace(entry._id, entry); + + if (workId) { + entry.step.workId = workId; + await ctx.db.replace(entry._id, entry); + } console.event("started", { workflowId: workflow._id, workflowName: workflow.name, - stepName: name, + stepName: step.name, stepNumber, }); return entry; @@ -149,3 +163,81 @@ export const startSteps = mutation({ return entries; }, }); + +export const resume = mutation({ + args: { + workflowHandle: v.string(), + workflowId: v.id("workflows"), + resumeValue: v.any(), + name: v.optional(v.string()), + }, + returns: v.null(), + handler: async (ctx, args) => { + const console = await getDefaultLogger(ctx); + const workflow = await getWorkflow(ctx, args.workflowId, null); + + if (workflow.runResult) { + throw new Error(`Workflow not running: ${args.workflowId}`); + } + + if (workflow.workflowHandle !== args.workflowHandle) { + throw new Error( + `Workflow handle mismatch: expected ${workflow.workflowHandle}, got ${args.workflowHandle}`, + ); + } + + let query = ctx.db + .query("steps") + .withIndex("inProgress", (q) => + q.eq("step.inProgress", true).eq("workflowId", args.workflowId), + ) + .filter((q) => q.eq(q.field("step.type"), "pause")); + + if (args.name) { + query = query.filter((q) => q.eq(q.field("step.name"), args.name)); + } + + const pausedStep = await query.first(); + + if (!pausedStep) { + const message = args.name + ? `No paused step with name "${args.name}" found for workflow: ${args.workflowId}` + : `No paused step found for workflow: ${args.workflowId}`; + throw new Error(message); + } + + pausedStep.step.inProgress = false; + pausedStep.step.completedAt = Date.now(); + pausedStep.step.runResult = { + kind: "success", + returnValue: args.resumeValue, + }; + + await ctx.db.replace(pausedStep._id, pausedStep); + + console.event("stepResumed", { + workflowId: workflow._id, + workflowName: workflow.name, + stepName: pausedStep.step.name, + stepNumber: pausedStep.stepNumber, + }); + + const workpool = await getWorkpool(ctx, {}); + await workpool.enqueueMutation( + ctx, + workflow.workflowHandle as FunctionHandle<"mutation">, + { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + { + name: workflow.name, + onComplete: internal.pool.handlerOnComplete, + context: { + workflowId: workflow._id, + generationNumber: workflow.generationNumber, + }, + }, + ); + }, +}); diff --git a/src/component/pool.ts b/src/component/pool.ts index 781ad68..b8b55b1 100644 --- a/src/component/pool.ts +++ b/src/component/pool.ts @@ -114,6 +114,18 @@ export const onComplete = internalMutation({ ); return; } + if ( + journalEntry.step.type === "pause" && + args.result.kind === "success" + ) { + console.event("stepPaused", { + workflowId, + workflowName: workflow.name, + stepName: journalEntry.step.name, + stepNumber: journalEntry.stepNumber, + }); + return; + } journalEntry.step.inProgress = false; journalEntry.step.completedAt = Date.now(); switch (args.result.kind) { diff --git a/src/component/schema.ts b/src/component/schema.ts index c96c0af..725f509 100644 --- a/src/component/schema.ts +++ b/src/component/schema.ts @@ -59,20 +59,36 @@ export const workflowDocument = v.object({ }); export type Workflow = Infer; -export const step = v.object({ +const baseStepFields = { name: v.string(), inProgress: v.boolean(), workId: v.optional(vWorkIdValidator), + runResult: v.optional(vResultValidator), + startedAt: v.number(), + completedAt: v.optional(v.number()), +}; + +const executionStep = v.object({ + ...baseStepFields, + type: v.literal("execution"), functionType: literals("query", "mutation", "action"), handle: v.string(), argsSize: v.number(), args: v.any(), - runResult: v.optional(vResultValidator), +}); - startedAt: v.number(), - completedAt: v.optional(v.number()), +const pauseStep = v.object({ + ...baseStepFields, + type: v.literal("pause"), + onPauseHandle: v.optional(v.string()), + argsSize: v.number(), + args: v.any(), }); + +export const step = v.union(executionStep, pauseStep); export type Step = Infer; +export type ExecutionStep = Infer; +export type PauseStep = Infer; function stepSize(step: Step): number { let size = 0; @@ -81,8 +97,17 @@ function stepSize(step: Step): number { if (step.workId) { size += step.workId.length; } - size += step.functionType.length; - size += step.handle.length; + size += step.type.length; + + if (step.type === "execution") { + size += step.functionType.length; + size += step.handle.length; + } else if (step.type === "pause") { + if (step.onPauseHandle) { + size += step.onPauseHandle.length; + } + } + size += 8 + step.argsSize; if (step.runResult) { size += resultSize(step.runResult);