diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 320362d384..0dc9e38444 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1102,6 +1102,7 @@ const EnvironmentSchema = z RUN_REPLICATION_INSERT_BASE_DELAY_MS: z.coerce.number().int().default(100), RUN_REPLICATION_INSERT_MAX_DELAY_MS: z.coerce.number().int().default(2000), RUN_REPLICATION_INSERT_STRATEGY: z.enum(["insert", "insert_async"]).default("insert"), + RUN_REPLICATION_DISABLE_PAYLOAD_INSERT: z.string().default("0"), // Clickhouse CLICKHOUSE_URL: z.string(), diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index 2c9aafb1c0..3830d3dd6d 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -66,6 +66,7 @@ function initializeRunsReplicationInstance() { insertBaseDelayMs: env.RUN_REPLICATION_INSERT_BASE_DELAY_MS, insertMaxDelayMs: env.RUN_REPLICATION_INSERT_MAX_DELAY_MS, insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY, + disablePayloadInsert: env.RUN_REPLICATION_DISABLE_PAYLOAD_INSERT === "1", }); if (env.RUN_REPLICATION_ENABLED === "1") { diff --git a/apps/webapp/app/services/runsReplicationService.server.ts b/apps/webapp/app/services/runsReplicationService.server.ts index e7a870f778..c150eb2a00 100644 --- a/apps/webapp/app/services/runsReplicationService.server.ts +++ b/apps/webapp/app/services/runsReplicationService.server.ts @@ -57,6 +57,7 @@ export type RunsReplicationServiceOptions = { insertMaxRetries?: number; insertBaseDelayMs?: number; insertMaxDelayMs?: number; + disablePayloadInsert?: boolean; }; type PostgresTaskRun = TaskRun & { masterQueue: string }; @@ -100,6 +101,7 @@ export class RunsReplicationService { private _insertBaseDelayMs: number; private _insertMaxDelayMs: number; private _insertStrategy: "insert" | "insert_async"; + private _disablePayloadInsert: boolean; public readonly events: EventEmitter; @@ -112,6 +114,7 @@ export class RunsReplicationService { this._acknowledgeTimeoutMs = options.acknowledgeTimeoutMs ?? 1_000; this._insertStrategy = options.insertStrategy ?? "insert"; + this._disablePayloadInsert = options.disablePayloadInsert ?? false; this._replicationClient = new LogicalReplicationClient({ pgConfig: { @@ -750,7 +753,7 @@ export class RunsReplicationService { }; } - if (event === "update" || event === "delete") { + if (event === "update" || event === "delete" || this._disablePayloadInsert) { const taskRunInsert = await this.#prepareTaskRunInsert( run, run.organizationId,