@@ -93,10 +93,10 @@ type XController struct {
9393
9494 // QueueJobs that need to be initialized
9595 // Add labels and selectors to AppWrapper
96- initQueue * cache.FIFO
96+ // initQueue *cache.FIFO
9797
9898 // QueueJobs that need to sync up after initialization
99- updateQueue * cache.FIFO
99+ // updateQueue *cache.FIFO
100100
101101 // eventQueue that need to sync up
102102 eventQueue * cache.FIFO
@@ -241,9 +241,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
241241 arbclients : clientset .NewForConfigOrDie (config ),
242242 eventQueue : cache .NewFIFO (GetQueueJobKey ),
243243 agentEventQueue : cache .NewFIFO (GetQueueJobKey ),
244- initQueue : cache .NewFIFO (GetQueueJobKey ),
245- updateQueue : cache .NewFIFO (GetQueueJobKey ),
246- qjqueue : NewSchedulingQueue (),
244+ // initQueue: cache.NewFIFO(GetQueueJobKey),
245+ // updateQueue: cache.NewFIFO(GetQueueJobKey),
246+ qjqueue : NewSchedulingQueue (),
247247 //cache is turned-off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588
248248 //cache: clusterstatecache.New(config),
249249 schedulingAW : nil ,
@@ -1262,7 +1262,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
12621262 klog .Infof ("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s" ,
12631263 qj .Namespace , qj .Name , time .Now ().Sub (HOLStartTime ), qjm .qjqueue .IfExistActiveQ (qj ), qjm .qjqueue .IfExistUnschedulableQ (qj ), qj , qj .ResourceVersion , qj .Status , msg )
12641264 //call update etcd here to retrigger AW execution for failed quota
1265-
1265+ //TODO: quota management tests fail if this is converted into go-routine, need to inspect why?
12661266 qjm .backoff (context .Background (), qj , dispatchFailedReason , dispatchFailedMessage )
12671267
12681268 }
@@ -1702,31 +1702,12 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
17021702 klog .Errorf ("[Informer-deleteQJ] obj is not AppWrapper. obj=%+v" , obj )
17031703 return
17041704 }
1705- current_ts := metav1 .NewTime (time .Now ())
1706- klog .V (10 ).Infof ("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v" , qj .Name , time .Now ().Sub (qj .Status .ControllerFirstTimestamp .Time ).Seconds (), qj , qj .ResourceVersion , qj .Status , qj .GetDeletionTimestamp ())
1707- accessor , err := meta .Accessor (qj )
1708- if err != nil {
1709- klog .V (10 ).Infof ("[Informer-deleteQJ] Error obtaining the accessor for AW job: %s" , qj .Name )
1710- qj .SetDeletionTimestamp (& current_ts )
1711- } else {
1712- accessor .SetDeletionTimestamp (& current_ts )
1713- }
1714- // validate that app wraper has not been marked for deletion by the infomer's delete handler
1715- if qj .DeletionTimestamp != nil {
1716- klog .V (3 ).Infof ("[Informer-deleteQJ] AW job=%s/%s set for deletion." , qj .Namespace , qj .Name )
1717- // cleanup resources for running job, ignoring errors
1718- if err00 := cc .Cleanup (context .Background (), qj ); err00 != nil {
1719- klog .Warningf ("Failed to cleanup resources for app wrapper '%s/%s', err = %v" , qj .Namespace , qj .Name , err00 )
1720- }
1721- // empty finalizers and delete the queuejob again
1722- if accessor , err00 := meta .Accessor (qj ); err00 == nil {
1723- accessor .SetFinalizers (nil )
1724- }
1725- // we delete the job from the queue if it is there, ignoring errors
1726- cc .qjqueue .Delete (qj )
1727- cc .eventQueue .Delete (qj )
1728- klog .V (3 ).Infof ("[Informer-deleteQJ] AW job=%s/%s deleted." , qj .Namespace , qj .Name )
1705+ // we delete the job from the queue if it is there, ignoring errors
1706+ if cc .serverOption .QuotaEnabled && cc .quotaManager != nil {
1707+ cc .quotaManager .Release (qj )
17291708 }
1709+ cc .qjqueue .Delete (qj )
1710+ cc .eventQueue .Delete (qj )
17301711}
17311712
17321713func (cc * XController ) enqueue (obj interface {}) error {
@@ -1888,19 +1869,27 @@ func (cc *XController) worker() {
18881869 //if everything passes then CanRun is set to true and AW is ready for dispatch
18891870 if ! queuejob .Status .CanRun && (queuejob .Status .State != arbv1 .AppWrapperStateActive ) {
18901871 cc .ScheduleNext (queuejob )
1891- return nil
1872+ //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
1873+ //Sync queuejob will not unwrap an AW to spawn genericItems
1874+ if queuejob .Status .CanRun {
1875+
1876+ // errs := make(chan error, 1)
1877+ // go func() {
1878+ // errs <- cc.syncQueueJob(ctx, queuejob)
1879+ // }()
1880+
1881+ // // later:
1882+ // if err := <-errs; err != nil {
1883+ // return err
1884+ // }
1885+ if err := cc .syncQueueJob (ctx , queuejob ); err != nil {
1886+ // If any error, requeue it.
1887+ return err
1888+ }
18921889
1893- }
1894- //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
1895- //Sync queuejob will not unwrap an AW to spawn genericItems
1896- if queuejob .Status .CanRun {
1897- if err := cc .syncQueueJob (ctx , queuejob ); err != nil {
1898- // If any error, requeue it.
1899- return err
19001890 }
19011891
19021892 }
1903-
19041893 //asmalvan- ends
19051894
19061895 klog .V (10 ).Infof ("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v" , queuejob .Name , time .Now ().Sub (queuejob .Status .ControllerFirstTimestamp .Time ).Seconds (), queuejob , queuejob .ResourceVersion , queuejob .Status )
0 commit comments