File tree Expand file tree Collapse file tree 3 files changed +23
-8
lines changed
internal-packages/run-engine/src Expand file tree Collapse file tree 3 files changed +23
-8
lines changed Original file line number Diff line number Diff line change @@ -1414,6 +1414,11 @@ export class RunEngine {
14141414 throw new NotImplementedError ( "There shouldn't be a heartbeat for QUEUED_EXECUTING" ) ;
14151415 }
14161416 case "PENDING_EXECUTING" : {
1417+ this . logger . log ( "RunEngine stalled snapshot PENDING_EXECUTING" , {
1418+ runId,
1419+ snapshotId : latestSnapshot . id ,
1420+ } ) ;
1421+
14171422 //the run didn't start executing, we need to requeue it
14181423 const run = await prisma . taskRun . findFirst ( {
14191424 where : { id : runId } ,
Original file line number Diff line number Diff line change @@ -143,6 +143,15 @@ export class DequeueSystem {
143143 const orgId = message . message . orgId ;
144144 const runId = message . messageId ;
145145
146+ this . $ . logger . info ( "DequeueSystem.dequeueFromWorkerQueue dequeued message" , {
147+ runId,
148+ orgId,
149+ environmentId : message . message . environmentId ,
150+ environmentType : message . message . environmentType ,
151+ workerQueueLength : message . workerQueueLength ?? 0 ,
152+ workerQueue,
153+ } ) ;
154+
146155 span . setAttribute ( "run_id" , runId ) ;
147156 span . setAttribute ( "org_id" , orgId ) ;
148157 span . setAttribute ( "environment_id" , message . message . environmentId ) ;
Original file line number Diff line number Diff line change @@ -1417,27 +1417,28 @@ export class RunQueue {
14171417
14181418 const pipeline = this . redis . pipeline ( ) ;
14191419
1420- const workerQueueKeys = new Set < string > ( ) ;
1420+ const operations = [ ] ;
14211421
14221422 for ( const message of messages ) {
14231423 const workerQueueKey = this . keys . workerQueueKey (
14241424 this . #getWorkerQueueFromMessage( message . message )
14251425 ) ;
14261426
1427- workerQueueKeys . add ( workerQueueKey ) ;
1428-
14291427 const messageKeyValue = this . keys . messageKey ( message . message . orgId , message . messageId ) ;
14301428
1429+ operations . push ( {
1430+ workerQueueKey : workerQueueKey ,
1431+ messageId : message . messageId ,
1432+ } ) ;
1433+
14311434 pipeline . rpush ( workerQueueKey , messageKeyValue ) ;
14321435 }
14331436
1434- span . setAttribute ( "worker_queue_count" , workerQueueKeys . size ) ;
1435- span . setAttribute ( "worker_queue_keys" , Array . from ( workerQueueKeys ) ) ;
1437+ span . setAttribute ( "operations_count" , operations . length ) ;
14361438
1437- this . logger . debug ( "enqueueMessagesToWorkerQueues pipeline " , {
1439+ this . logger . info ( "enqueueMessagesToWorkerQueues" , {
14381440 service : this . name ,
1439- messages,
1440- workerQueueKeys : Array . from ( workerQueueKeys ) ,
1441+ operations,
14411442 } ) ;
14421443
14431444 await pipeline . exec ( ) ;
You can’t perform that action at this time.
0 commit comments