@@ -71,6 +71,7 @@ export class RunEngine {
7171 private tracer : Tracer ;
7272 private meter : Meter ;
7373 private heartbeatTimeouts : HeartbeatTimeouts ;
74+ private repairSnapshotTimeoutMs : number ;
7475
7576 prisma : PrismaClient ;
7677 readOnlyPrisma : PrismaReplicaClient ;
@@ -191,6 +192,9 @@ export class RunEngine {
191192 heartbeatSnapshot : async ( { payload } ) => {
192193 await this . #handleStalledSnapshot( payload ) ;
193194 } ,
195+ repairSnapshot : async ( { payload } ) => {
196+ await this . #handleRepairSnapshot( payload ) ;
197+ } ,
194198 expireRun : async ( { payload } ) => {
195199 await this . ttlSystem . expireRun ( { runId : payload . runId } ) ;
196200 } ,
@@ -241,6 +245,8 @@ export class RunEngine {
241245 ...( options . heartbeatTimeoutsMs ?? { } ) ,
242246 } ;
243247
248+ this . repairSnapshotTimeoutMs = options . repairSnapshotTimeoutMs ?? 60_000 ;
249+
244250 const resources : SystemResources = {
245251 prisma : this . prisma ,
246252 worker : this . worker ,
@@ -1172,81 +1178,77 @@ export class RunEngine {
11721178 async repairEnvironment ( environment : AuthenticatedEnvironment , dryRun : boolean ) {
11731179 const runIds = await this . runQueue . getCurrentConcurrencyOfEnvironment ( environment ) ;
11741180
1175- const completedRuns = await this . #concurrencySweeperCallback( runIds , 5000 ) ;
1181+ return this . #repairRuns( runIds , dryRun ) ;
1182+ }
11761183
1177- if ( dryRun ) {
1178- return {
1179- runIds,
1180- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1181- dryRun,
1182- } ;
1183- }
1184+ async repairQueue (
1185+ environment : AuthenticatedEnvironment ,
1186+ queue : string ,
1187+ dryRun : boolean ,
1188+ ignoreRunIds : string [ ]
1189+ ) {
1190+ const runIds = await this . runQueue . getCurrentConcurrencyOfQueue ( environment , queue ) ;
1191+
1192+ const runIdsToRepair = runIds . filter ( ( runId ) => ! ignoreRunIds . includes ( runId ) ) ;
1193+
1194+ return this . #repairRuns( runIdsToRepair , dryRun ) ;
1195+ }
11841196
1185- if ( completedRuns . length === 0 ) {
1197+ async #repairRuns( runIds : string [ ] , dryRun : boolean ) {
1198+ if ( runIds . length === 0 ) {
11861199 return {
11871200 runIds,
1188- completedRunIds : [ ] ,
1201+ repairs : [ ] ,
11891202 dryRun,
11901203 } ;
11911204 }
11921205
1193- await pMap (
1194- completedRuns ,
1195- async ( run ) => {
1196- await this . runQueue . acknowledgeMessage ( run . orgId , run . id , {
1197- skipDequeueProcessing : true ,
1198- removeFromWorkerQueue : false ,
1199- } ) ;
1206+ const repairs = await pMap (
1207+ runIds ,
1208+ async ( runId ) => {
1209+ return this . #repairRun( runId , dryRun ) ;
12001210 } ,
12011211 { concurrency : 5 }
12021212 ) ;
12031213
12041214 return {
12051215 runIds,
1206- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1216+ repairs ,
12071217 dryRun,
12081218 } ;
12091219 }
12101220
1211- async repairQueue ( environment : AuthenticatedEnvironment , queue : string , dryRun : boolean ) {
1212- const runIds = await this . runQueue . getCurrentConcurrencyOfQueue ( environment , queue ) ;
1213-
1214- const completedRuns = await this . #concurrencySweeperCallback( runIds , 5000 ) ;
1215-
1216- if ( dryRun ) {
1217- return {
1218- queue,
1219- runIds,
1220- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1221- dryRun,
1222- } ;
1223- }
1221+ async #repairRun( runId : string , dryRun : boolean ) {
1222+ const snapshot = await getLatestExecutionSnapshot ( this . prisma , runId ) ;
1223+
1224+ if (
1225+ snapshot . executionStatus === "QUEUED" ||
1226+ snapshot . executionStatus === "SUSPENDED" ||
1227+ snapshot . executionStatus === "FINISHED"
1228+ ) {
1229+ if ( ! dryRun ) {
1230+ // Schedule the repair job
1231+ await this . worker . enqueueOnce ( {
1232+ id : `repair-in-progress-run:${ runId } ` ,
1233+ job : "repairSnapshot" ,
1234+ payload : { runId, snapshotId : snapshot . id , executionStatus : snapshot . executionStatus } ,
1235+ availableAt : new Date ( Date . now ( ) + this . repairSnapshotTimeoutMs ) ,
1236+ } ) ;
1237+ }
12241238
1225- if ( completedRuns . length === 0 ) {
12261239 return {
1227- queue ,
1228- runIds ,
1229- completedRunIds : [ ] ,
1230- dryRun ,
1240+ action : "repairSnapshot" ,
1241+ runId ,
1242+ snapshotStatus : snapshot . executionStatus ,
1243+ snapshotId : snapshot . id ,
12311244 } ;
12321245 }
12331246
1234- await pMap (
1235- completedRuns ,
1236- async ( run ) => {
1237- await this . runQueue . acknowledgeMessage ( run . orgId , run . id , {
1238- skipDequeueProcessing : true ,
1239- removeFromWorkerQueue : false ,
1240- } ) ;
1241- } ,
1242- { concurrency : 5 }
1243- ) ;
1244-
12451247 return {
1246- queue ,
1247- runIds ,
1248- completedRunIds : completedRuns . map ( ( r ) => r . id ) ,
1249- dryRun ,
1248+ action : "ignore" ,
1249+ runId ,
1250+ snapshotStatus : snapshot . executionStatus ,
1251+ snapshotId : snapshot . id ,
12501252 } ;
12511253 }
12521254
@@ -1642,6 +1644,117 @@ export class RunEngine {
16421644 } ) ;
16431645 }
16441646
1647+ async #handleRepairSnapshot( {
1648+ runId,
1649+ snapshotId,
1650+ executionStatus,
1651+ } : {
1652+ runId : string ;
1653+ snapshotId : string ;
1654+ executionStatus : string ;
1655+ } ) {
1656+ return await this . runLock . lock ( "handleRepairSnapshot" , [ runId ] , async ( ) => {
1657+ const latestSnapshot = await getLatestExecutionSnapshot ( this . prisma , runId ) ;
1658+
1659+ if ( latestSnapshot . id !== snapshotId ) {
1660+ this . logger . log (
1661+ "RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair." ,
1662+ {
1663+ runId,
1664+ snapshotId,
1665+ latestSnapshotExecutionStatus : latestSnapshot . executionStatus ,
1666+ repairExecutionStatus : executionStatus ,
1667+ }
1668+ ) ;
1669+
1670+ return ;
1671+ }
1672+
1673+ // Okay, so this means we haven't transitioned to a new status yes, so we need to do something
1674+ switch ( latestSnapshot . executionStatus ) {
1675+ case "EXECUTING" :
1676+ case "EXECUTING_WITH_WAITPOINTS" :
1677+ case "FINISHED" :
1678+ case "PENDING_CANCEL" :
1679+ case "PENDING_EXECUTING" :
1680+ case "QUEUED_EXECUTING" :
1681+ case "RUN_CREATED" : {
1682+ // Do nothing;
1683+ return ;
1684+ }
1685+ case "QUEUED" : {
1686+ this . logger . log ( "RunEngine.handleRepairSnapshot QUEUED" , {
1687+ runId,
1688+ snapshotId,
1689+ } ) ;
1690+
1691+ //it will automatically be requeued X times depending on the queue retry settings
1692+ const gotRequeued = await this . runQueue . nackMessage ( {
1693+ orgId : latestSnapshot . organizationId ,
1694+ messageId : runId ,
1695+ } ) ;
1696+
1697+ if ( ! gotRequeued ) {
1698+ this . logger . error ( "RunEngine.handleRepairSnapshot QUEUED repair failed" , {
1699+ runId,
1700+ snapshot : latestSnapshot ,
1701+ } ) ;
1702+ } else {
1703+ this . logger . log ( "RunEngine.handleRepairSnapshot QUEUED repair successful" , {
1704+ runId,
1705+ snapshot : latestSnapshot ,
1706+ } ) ;
1707+ }
1708+
1709+ break ;
1710+ }
1711+ case "SUSPENDED" : {
1712+ this . logger . log ( "RunEngine.handleRepairSnapshot SUSPENDED" , {
1713+ runId,
1714+ snapshotId,
1715+ } ) ;
1716+
1717+ const taskRun = await this . prisma . taskRun . findFirst ( {
1718+ where : { id : runId } ,
1719+ select : {
1720+ queue : true ,
1721+ } ,
1722+ } ) ;
1723+
1724+ if ( ! taskRun ) {
1725+ this . logger . error ( "RunEngine.handleRepairSnapshot SUSPENDED task run not found" , {
1726+ runId,
1727+ snapshotId,
1728+ } ) ;
1729+ return ;
1730+ }
1731+
1732+ // We need to clear this run from the current concurrency sets
1733+ await this . runQueue . clearMessageFromConcurrencySets ( {
1734+ runId,
1735+ orgId : latestSnapshot . organizationId ,
1736+ queue : taskRun . queue ,
1737+ env : {
1738+ id : latestSnapshot . environmentId ,
1739+ type : latestSnapshot . environmentType ,
1740+ project : {
1741+ id : latestSnapshot . projectId ,
1742+ } ,
1743+ organization : {
1744+ id : latestSnapshot . organizationId ,
1745+ } ,
1746+ } ,
1747+ } ) ;
1748+
1749+ break ;
1750+ }
1751+ default : {
1752+ assertNever ( latestSnapshot . executionStatus ) ;
1753+ }
1754+ }
1755+ } ) ;
1756+ }
1757+
16451758 async #concurrencySweeperCallback(
16461759 runIds : string [ ] ,
16471760 completedAtOffsetMs : number = 1000 * 60 * 10
0 commit comments