Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions scheduler/feasible/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ func NewEvalEligibility() *EvalEligibility {
}
}

// Reset clears the contents of the eval eligibility
func (e *EvalEligibility) Reset() {
e.job = make(map[string]ComputedClassFeasibility)
e.taskGroups = make(map[string]map[string]ComputedClassFeasibility)
e.tgEscapedConstraints = make(map[string]bool)
}

// SetJob takes the job being evaluated and calculates the escaped constraints
// at the job and task group level.
func (e *EvalEligibility) SetJob(job *structs.Job) {
Expand Down
5 changes: 5 additions & 0 deletions scheduler/feasible/stack.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,11 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
// Reset the binpack selector and context
s.scoreNorm.Reset()
s.ctx.Reset()

// Since the system stack is always evaluating a single
// node, previous eligibility information is not applicable
// so reset it
s.ctx.Eligibility().Reset()
start := time.Now()

// Get the task groups constraints.
Expand Down
220 changes: 19 additions & 201 deletions scheduler/reconciler/reconcile_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package reconciler

import (
"fmt"
"maps"
"math"
"slices"
"time"

Expand Down Expand Up @@ -61,25 +59,14 @@ func (nr *NodeReconciler) Compute(
// Create the required task groups.
required := materializeSystemTaskGroups(job)

// Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary
// percentage of eligible nodes, so we create a mapping of task group name
// to a list of nodes that canaries should be placed on.
canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes)

compatHadExistingDeployment := nr.DeploymentCurrent != nil

result := new(NodeReconcileResult)
var deploymentComplete bool
for nodeID, allocs := range nodeAllocs {
diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes,
notReadyNodes, taintedNodes, canaryNodes[nodeID], canariesPerTG, required,
allocs, terminal, serverSupportsDisconnectedClients)
diff := nr.computeForNode(job, nodeID, eligibleNodes,
notReadyNodes, taintedNodes, required, allocs, terminal,
serverSupportsDisconnectedClients)
result.Append(diff)

deploymentComplete = deploymentCompleteForNode
if deploymentComplete {
break
}
}

// COMPAT(1.14.0) prevent a new deployment from being created in the case
Expand All @@ -90,93 +77,9 @@ func (nr *NodeReconciler) Compute(
nr.DeploymentCurrent = nil
}

nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...)

return result
}

// computeCanaryNodes is a helper function that, given required task groups,
// mappings of nodes to their live allocs and terminal allocs, and a map of
// eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates
// which TGs this node is a canary for, and a map[TG] -> int to indicate how
// many total canaries are to be placed for a TG.
func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup,
liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName,
eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) {

canaryNodes := map[string]map[string]bool{}
eligibleNodesList := slices.Collect(maps.Values(eligibleNodes))
canariesPerTG := map[string]int{}

for _, tg := range required {
if tg.Update.IsEmpty() || tg.Update.Canary == 0 {
continue
}

// round up to the nearest integer
numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100))
canariesPerTG[tg.Name] = numberOfCanaryNodes

// check if there are any live allocations on any nodes that are/were
// canaries.
for nodeID, allocs := range liveAllocs {
for _, a := range allocs {
eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes(
eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID)
}
}

// check if there are any terminal allocations that were canaries
for nodeID, terminalAlloc := range terminalAllocs {
for _, a := range terminalAlloc {
eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes(
eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID)
}
}

for i, n := range eligibleNodesList {
if i > numberOfCanaryNodes-1 {
break
}

if _, ok := canaryNodes[n.ID]; !ok {
canaryNodes[n.ID] = map[string]bool{}
}

canaryNodes[n.ID][tg.Name] = true
}
}

return canaryNodes, canariesPerTG
}

func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOfCanaryNodes int,
a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) {

if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false ||
nr.DeploymentCurrent == nil {
return nodesList, numberOfCanaryNodes
}

nodes := nodesList
numberOfCanaries := numberOfCanaryNodes
if a.TaskGroup == tg.Name {
if _, ok := canaryNodes[nodeID]; !ok {
canaryNodes[nodeID] = map[string]bool{}
}
canaryNodes[nodeID][tg.Name] = true

// this node should no longer be considered when searching
// for canary nodes
numberOfCanaries -= 1
nodes = slices.DeleteFunc(
nodes,
func(n *structs.Node) bool { return n.ID == nodeID },
)
}
return nodes, numberOfCanaries
}

// computeForNode is used to do a set difference between the target
// allocations and the existing allocations for a particular node. This returns
// 8 sets of results:
Expand All @@ -191,21 +94,18 @@ func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOf
// 8. those that may still be running on a node that has resumed reconnected.
//
// This method mutates the NodeReconciler fields, and returns a new
// NodeReconcilerResult object and a boolean to indicate wither the deployment
// is complete or not.
// NodeReconcilerResult object.
func (nr *NodeReconciler) computeForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node id)
canaryNode map[string]bool, // indicates whether this node is a canary node for tg
canariesPerTG map[string]int, // indicates how many canary placements we expect per tg
required map[string]*structs.TaskGroup, // set of allocations that must exist
liveAllocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) (*NodeReconcileResult, bool) {
) *NodeReconcileResult {
result := new(NodeReconcileResult)

// cancel deployments that aren't needed anymore
Expand All @@ -225,9 +125,6 @@ func (nr *NodeReconciler) computeForNode(
deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed
}

// Track desired total and desired canaries across all loops
desiredCanaries := map[string]int{}

// Track whether we're during a canary update
isCanarying := map[string]bool{}

Expand Down Expand Up @@ -255,7 +152,7 @@ func (nr *NodeReconciler) computeForNode(
// deployment
var dstate = new(structs.DeploymentState)
if nr.DeploymentCurrent != nil {
dstate, _ = nr.DeploymentCurrent.TaskGroups[tg.Name]
dstate = nr.DeploymentCurrent.TaskGroups[tg.Name]
}

supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)
Expand Down Expand Up @@ -388,17 +285,14 @@ func (nr *NodeReconciler) computeForNode(

// If the definition is updated we need to update
if job.JobModifyIndex != alloc.Job.JobModifyIndex {
if canariesPerTG[tg.Name] > 0 && dstate != nil && !dstate.Promoted {
if !tg.Update.IsEmpty() && tg.Update.Canary > 0 && dstate != nil && !dstate.Promoted {
isCanarying[tg.Name] = true
if canaryNode[tg.Name] {
result.Update = append(result.Update, AllocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
Canary: true,
})
desiredCanaries[tg.Name] += 1
}
result.Update = append(result.Update, AllocTuple{
Name: name,
TaskGroup: tg,
Alloc: alloc,
Canary: true,
})
} else {
result.Update = append(result.Update, AllocTuple{
Name: name,
Expand All @@ -419,13 +313,8 @@ func (nr *NodeReconciler) computeForNode(
})
}

// as we iterate over require groups, we'll keep track of whether the
// deployment is complete or not
deploymentComplete := false

// Scan the required groups
for name, tg := range required {

// populate deployment state for this task group
var dstate = new(structs.DeploymentState)
var existingDeployment bool
Expand All @@ -440,16 +329,10 @@ func (nr *NodeReconciler) computeForNode(
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
dstate.DesiredTotal = len(eligibleNodes)
}

if isCanarying[tg.Name] && !dstate.Promoted {
dstate.DesiredCanaries = canariesPerTG[tg.Name]
}

// Check for an existing allocation
if _, ok := existing[name]; !ok {

// Check for a terminal sysbatch allocation, which should be not placed
// again unless the job has been updated.
if job.Type == structs.JobTypeSysBatch {
Expand Down Expand Up @@ -494,6 +377,11 @@ func (nr *NodeReconciler) computeForNode(
Alloc: termOnNode,
}

// If the terminal allocation was a canary, mark it as such.
if termOnNode != nil && termOnNode.DeploymentStatus != nil && termOnNode.DeploymentStatus.Canary {
allocTuple.Canary = true
}

// If the new allocation isn't annotated with a previous allocation
// or if the previous allocation isn't from the same node then we
// annotate the allocTuple with a new Allocation
Expand All @@ -506,12 +394,10 @@ func (nr *NodeReconciler) computeForNode(

// check if deployment is place ready or complete
deploymentPlaceReady := !deploymentPaused && !deploymentFailed
deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name])

// check if perhaps there's nothing else to do for this TG
if existingDeployment ||
tg.Update.IsEmpty() ||
(dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) ||
!deploymentPlaceReady {
continue
}
Expand All @@ -527,7 +413,7 @@ func (nr *NodeReconciler) computeForNode(
}
}

return result, deploymentComplete
return result
}

func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup,
Expand Down Expand Up @@ -586,74 +472,6 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro
nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate
}

func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool {
complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0

if !complete || nr.DeploymentCurrent == nil || isCanarying {
return false
}

// ensure everything is healthy
if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs
complete = false
}
}

return complete
}

func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate {
statusUpdates := []*structs.DeploymentStatusUpdate{}

if d := nr.DeploymentCurrent; d != nil {

// Deployments that require promotion should have appropriate status set
// immediately, no matter their completness.
if d.RequiresPromotion() {
if d.HasAutoPromote() {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
} else {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
}
return statusUpdates
}

// Mark the deployment as complete if possible
if deploymentComplete {
if job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if d.Status != structs.DeploymentStatusUnblocking &&
d.Status != structs.DeploymentStatusSuccessful {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
}

// Mark the deployment as pending since its state is now computed.
if d.Status == structs.DeploymentStatusInitializing {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}

return statusUpdates
}

// materializeSystemTaskGroups is used to materialize all the task groups
// a system or sysbatch job requires.
func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
Expand Down
Loading