Skip to content

Commit b434ed2

Browse files
committed
check generation number before logging or marking other failures
1 parent baedaeb commit b434ed2

File tree

1 file changed

+19
-14
lines changed

1 file changed

+19
-14
lines changed

src/component/pool.ts

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,12 @@ export const onComplete = internalMutation({
8686
assert(journalEntry, `Journal entry not found: ${stepId}`);
8787
const workflowId = journalEntry.workflowId;
8888

89-
const error = !validate(onCompleteContext, args.context)
90-
? `Invalid onComplete context for workId ${args.workId}` +
91-
JSON.stringify(args.context)
92-
: !journalEntry.step.inProgress
93-
? `Journal entry not in progress: ${stepId}`
94-
: undefined;
95-
if (error) {
89+
if (
90+
!validate(onCompleteContext, args.context, { allowUnknownFields: true })
91+
) {
92+
const error =
93+
`Invalid onComplete context for workId ${args.workId}` +
94+
JSON.stringify(args.context);
9695
await ctx.db.patch(workflowId, {
9796
runResult: {
9897
kind: "failed",
@@ -102,6 +101,19 @@ export const onComplete = internalMutation({
102101
return;
103102
}
104103
const { generationNumber } = args.context;
104+
const workflow = await getWorkflow(ctx, workflowId, null);
105+
if (workflow.generationNumber !== generationNumber) {
106+
console.error(
107+
`Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`,
108+
);
109+
return;
110+
}
111+
if (!journalEntry.step.inProgress) {
112+
console.error(
113+
`Step finished but journal entry not in progress: ${stepId} status: ${journalEntry.step.runResult?.kind ?? "pending"}`,
114+
);
115+
return;
116+
}
105117
journalEntry.step.inProgress = false;
106118
journalEntry.step.completedAt = Date.now();
107119
switch (args.result.kind) {
@@ -126,7 +138,6 @@ export const onComplete = internalMutation({
126138
await ctx.db.replace(journalEntry._id, journalEntry);
127139
console.debug(`Completed execution of ${stepId}`, journalEntry);
128140

129-
const workflow = await getWorkflow(ctx, workflowId, null);
130141
console.event("stepCompleted", {
131142
workflowId,
132143
workflowName: workflow.name,
@@ -143,12 +154,6 @@ export const onComplete = internalMutation({
143154
}
144155
return;
145156
}
146-
if (workflow.generationNumber !== generationNumber) {
147-
console.error(
148-
`Workflow: ${workflowId} already has generation number ${workflow.generationNumber} when completing ${stepId}`,
149-
);
150-
return;
151-
}
152157
const workpool = await getWorkpool(ctx, args.context.workpoolOptions);
153158
await workpool.enqueueMutation(
154159
ctx,

0 commit comments

Comments
 (0)