diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 1823e18a..94d0c623 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -466,7 +466,7 @@ func (qjm *XController) PreemptQueueJobs() { // Only back-off AWs that are in state running and not in state Failed if updateNewJob.Status.State != arbv1.AppWrapperStateFailed { klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", aw.Name, aw.Namespace) - go qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) + qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message)) } } } @@ -1155,7 +1155,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { } else { dispatchFailedMessage = "Cannot find an cluster with enough resources to dispatch AppWrapper." klog.V(2).Infof("[ScheduleNex] [Dispatcher Mode] %s %s\n", dispatchFailedReason, dispatchFailedMessage) - go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) + qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) } } else { // Agent Mode aggqj := qjm.GetAggregatedResources(qj) @@ -1284,7 +1284,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { // TODO: Remove forwarded logic as a big AW will never be forwarded forwarded = true // should we call backoff or update etcd? - go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) + qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) } } forwarded = true @@ -1347,7 +1347,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { if qjm.quotaManager != nil && quotaFits { qjm.quotaManager.Release(qj) } - go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) + qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) } } return nil @@ -1672,6 +1672,20 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { } klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status) + for _, cond := range newQJ.Status.Conditions { + if cond.Type == arbv1.AppWrapperCondBackoff { + //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue. + //TODO: we could plug an interface here with back-off strategies for different MCAD use cases. + time.AfterFunc(time.Duration(cc.serverOption.BackoffTime)*time.Second, func() { + if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + cc.quotaManager.Release(newQJ) + } + cc.enqueue(newQJ) + }) + return + } + } + // cc.eventQueue.Delete(oldObj) cc.enqueue(newQJ) }