@@ -368,16 +368,17 @@ func (qjm *XController) PreemptQueueJobs() {
368368 updateErr := qjm .UpdateQueueJobStatus (newjob )
369369 if updateErr != nil {
370370 klog .Warningf ("[PreemptQueueJobs] update of pod count to AW %v failed hence skipping preemption" , newjob .Name )
371+ return
371372 }
372373 newjob .Status .CanRun = false
373374 newjob .Status .FilterIgnore = true // update QueueJobState only
374375 cleanAppWrapper := false
375376 // If dispatch deadline is exceeded no matter what the state of AW, kill the job and set status as Failed.
376- if (aw .Status .State == arbv1 .AppWrapperStateActive ) && (aw .Spec .SchedSpec .DispatchDuration .Limit > 0 ) {
377- if aw .Spec .SchedSpec .DispatchDuration .Overrun {
378- index := getIndexOfMatchedCondition (aw , arbv1 .AppWrapperCondPreemptCandidate , "DispatchDeadlineExceeded" )
377+ if (newjob .Status .State == arbv1 .AppWrapperStateActive ) && (newjob .Spec .SchedSpec .DispatchDuration .Limit > 0 ) {
378+ if newjob .Spec .SchedSpec .DispatchDuration .Overrun {
379+ index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "DispatchDeadlineExceeded" )
379380 if index < 0 {
380- message = fmt .Sprintf ("Dispatch deadline exceeded. allowed to run for %v seconds" , aw .Spec .SchedSpec .DispatchDuration .Limit )
381+ message = fmt .Sprintf ("Dispatch deadline exceeded. allowed to run for %v seconds" , newjob .Spec .SchedSpec .DispatchDuration .Limit )
381382 cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "DispatchDeadlineExceeded" , message )
382383 newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
383384 } else {
@@ -392,7 +393,7 @@ func (qjm *XController) PreemptQueueJobs() {
392393
393394 err := qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- DispatchDeadlineExceeded" )
394395 if err != nil {
395- klog .Warningf ("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed" , aw .Namespace , aw .Name )
396+ klog .Warningf ("[PreemptQueueJobs] status update CanRun: false -- DispatchDeadlineExceeded for '%s/%s' failed" , newjob .Namespace , newjob .Name )
396397 continue
397398 }
398399 // cannot use cleanup AW, since it puts AW back in running state
@@ -403,33 +404,33 @@ func (qjm *XController) PreemptQueueJobs() {
403404 }
404405 }
405406
406- if ((aw .Status .Running + aw .Status .Succeeded ) < int32 (aw .Spec .SchedSpec .MinAvailable )) && aw .Status .State == arbv1 .AppWrapperStateActive {
407- index := getIndexOfMatchedCondition (aw , arbv1 .AppWrapperCondPreemptCandidate , "MinPodsNotRunning" )
407+ if ((newjob .Status .Running + newjob .Status .Succeeded ) < int32 (newjob .Spec .SchedSpec .MinAvailable )) && newjob .Status .State == arbv1 .AppWrapperStateActive {
408+ index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "MinPodsNotRunning" )
408409 if index < 0 {
409- message = fmt .Sprintf ("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d." , aw .Spec .SchedSpec .MinAvailable , aw .Status .Running , aw .Status .Succeeded )
410+ message = fmt .Sprintf ("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d." , newjob .Spec .SchedSpec .MinAvailable , newjob .Status .Running , newjob .Status .Succeeded )
410411 cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "MinPodsNotRunning" , message )
411412 newjob .Status .Conditions = append (newjob .Status .Conditions , cond )
412413 } else {
413414 cond := GenerateAppWrapperCondition (arbv1 .AppWrapperCondPreemptCandidate , v1 .ConditionTrue , "MinPodsNotRunning" , "" )
414415 newjob .Status .Conditions [index ] = * cond .DeepCopy ()
415416 }
416417
417- if aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
418- aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds = aw .Spec .SchedSpec .Requeuing .TimeInSeconds
418+ if newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds == 0 {
419+ newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds = newjob .Spec .SchedSpec .Requeuing .TimeInSeconds
419420 }
420- if aw .Spec .SchedSpec .Requeuing .GrowthType == "exponential" {
421+ if newjob .Spec .SchedSpec .Requeuing .GrowthType == "exponential" {
421422 if newjob .Status .RequeueingTimeInSeconds == 0 {
422- newjob .Status .RequeueingTimeInSeconds += aw .Spec .SchedSpec .Requeuing .TimeInSeconds
423+ newjob .Status .RequeueingTimeInSeconds += newjob .Spec .SchedSpec .Requeuing .TimeInSeconds
423424 } else {
424425 newjob .Status .RequeueingTimeInSeconds += newjob .Status .RequeueingTimeInSeconds
425426 }
426- } else if aw .Spec .SchedSpec .Requeuing .GrowthType == "linear" {
427- newjob .Status .RequeueingTimeInSeconds += aw .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
427+ } else if newjob .Spec .SchedSpec .Requeuing .GrowthType == "linear" {
428+ newjob .Status .RequeueingTimeInSeconds += newjob .Spec .SchedSpec .Requeuing .InitialTimeInSeconds
428429 }
429430
430- if aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds > 0 {
431- if aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds <= newjob .Status .RequeueingTimeInSeconds {
432- newjob .Status .RequeueingTimeInSeconds = aw .Spec .SchedSpec .Requeuing .MaxTimeInSeconds
431+ if newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds > 0 {
432+ if newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds <= newjob .Status .RequeueingTimeInSeconds {
433+ newjob .Status .RequeueingTimeInSeconds = newjob .Spec .SchedSpec .Requeuing .MaxTimeInSeconds
433434 }
434435 }
435436
@@ -443,7 +444,7 @@ func (qjm *XController) PreemptQueueJobs() {
443444 updateNewJob = newjob .DeepCopy ()
444445 } else {
445446 // If pods failed scheduling generate new preempt condition
446- message = fmt .Sprintf ("Pods failed scheduling failed=%v, running=%v." , len (aw .Status .PendingPodConditions ), aw .Status .Running )
447+ message = fmt .Sprintf ("Pods failed scheduling failed=%v, running=%v." , len (newjob .Status .PendingPodConditions ), newjob .Status .Running )
447448 index := getIndexOfMatchedCondition (newjob , arbv1 .AppWrapperCondPreemptCandidate , "PodsFailedScheduling" )
448449 // ignore co-scheduler failed scheduling events. This is a temp
449450 // work-around until co-scheduler version 0.22.X perf issues are resolved.
@@ -460,17 +461,17 @@ func (qjm *XController) PreemptQueueJobs() {
460461
461462 err = qjm .updateStatusInEtcdWithRetry (ctx , updateNewJob , "PreemptQueueJobs - CanRun: false -- MinPodsNotRunning" )
462463 if err != nil {
463- klog .Warningf ("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v" , aw .Namespace , aw .Name , err )
464+ klog .Warningf ("[PreemptQueueJobs] status update for '%s/%s' failed, skipping app wrapper err =%v" , newjob .Namespace , newjob .Name , err )
464465 continue
465466 }
466467
467468 if cleanAppWrapper {
468- klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , aw .Name , aw .Namespace )
469+ klog .V (4 ).Infof ("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of re-queueing(s) exceeded." , newjob .Name , newjob .Namespace )
469470 go qjm .Cleanup (ctx , updateNewJob )
470471 } else {
471472 // Only back-off AWs that are in state running and not in state Failed
472473 if updateNewJob .Status .State != arbv1 .AppWrapperStateFailed {
473- klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , aw .Name , aw .Namespace )
474+ klog .Infof ("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue." , newjob .Name , newjob .Namespace )
474475 qjm .backoff (ctx , updateNewJob , "PreemptionTriggered" , string (message ))
475476 }
476477 }
0 commit comments