Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26961.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed scheduling behavior of batch job allocations
```
50 changes: 34 additions & 16 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,24 +185,9 @@ func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error
// Note: for cluster topologies where API consumers don't have network access to
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
// long pauses on this API call.
//
// BREAKING: This method will have the following signature in 1.6.0
// func (a *Allocations) Stop(allocID string, w *WriteOptions) (*AllocStopResponse, error) {
func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) {
// COMPAT: Remove in 1.6.0
var w *WriteOptions
if q != nil {
w = &WriteOptions{
Region: q.Region,
Namespace: q.Namespace,
AuthToken: q.AuthToken,
Headers: q.Headers,
ctx: q.ctx,
}
}

var resp AllocStopResponse
wm, err := a.client.put("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, w)
wm, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q)
if wm != nil {
resp.LastIndex = wm.LastIndex
resp.RequestTime = wm.RequestTime
Expand Down Expand Up @@ -590,13 +575,46 @@ type DesiredTransition struct {
// Reschedule is used to indicate that this allocation is eligible to be
// rescheduled.
Reschedule *bool

// ForceReschedule is used to indicate that this allocation must be
// rescheduled.
ForceReschedule *bool

// NoShutdownDelay is used to indicate any configured shutdown delay
// should be ignored.
NoShutdownDelay *bool

// MigrateDisablePlacement is used to indicate that this allocation
// should not be placed during migration.
MigrateDisablePlacement *bool
}

// ShouldMigrate returns whether the transition object dictates a migration.
func (d DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}

// ShouldReschedule returns whether the transition object dictates a reschedule.
func (d DesiredTransition) ShouldReschedule() bool {
return d.Reschedule != nil && *d.Reschedule
}

// ShouldIgnoreShutdownDelay returns whether the transition object dictates
// ignoring the configured shutdown delay.
func (d DesiredTransition) ShouldIgnoreShutdownDelay() bool {
return d.NoShutdownDelay != nil && *d.NoShutdownDelay
}

// ShouldForceReschedule returns whether the transition object dictates a forced reschedule.
func (d DesiredTransition) ShouldForceReschedule() bool {
return d.ForceReschedule != nil && *d.ForceReschedule
}

// ShouldDisableMigrationPlacement returns whether the transition object dictates placement during migration
func (d DesiredTransition) ShouldDisableMigrationPlacement() bool {
return d.MigrateDisablePlacement != nil && *d.MigrateDisablePlacement
}

// ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively)
type ExecStreamingIOOperation struct {
Data []byte `json:"data,omitempty"`
Expand Down
146 changes: 121 additions & 25 deletions api/allocations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,34 +335,98 @@ func TestAllocations_Stop(t *testing.T) {
testutil.RequireRoot(t)
testutil.Parallel(t)

c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
t.Run("default", func(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
a := c.Allocations()

// wait for node
_ = oneNodeFromNodeList(t, c.Nodes())

// Create a job and register it
job := testJob()
_, wm, err := c.Jobs().Register(job, nil)
must.NoError(t, err)

// List allocations.
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
must.NoError(t, err)
must.SliceLen(t, 1, stubs)

// Stop the first allocation.
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
must.NoError(t, err)
test.UUIDv4(t, resp.EvalID)
test.NonZero(t, resp.LastIndex)

// Stop allocation that doesn't exist.
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
must.Error(t, err)
})
defer s.Stop()
a := c.Allocations()

// wait for node
_ = oneNodeFromNodeList(t, c.Nodes())

// Create a job and register it
job := testJob()
_, wm, err := c.Jobs().Register(job, nil)
must.NoError(t, err)

// List allocations.
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
must.NoError(t, err)
must.SliceLen(t, 1, stubs)

// Stop the first allocation.
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
must.NoError(t, err)
test.UUIDv4(t, resp.EvalID)
test.NonZero(t, resp.LastIndex)
t.Run("rescheduled", func(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
a := c.Allocations()

// wait for node
_ = oneNodeFromNodeList(t, c.Nodes())

// Create a job and register it
job := testJob()
_, wm, err := c.Jobs().Register(job, nil)
must.NoError(t, err)

// List allocations.
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
must.NoError(t, err)
must.SliceLen(t, 1, stubs)

// Stop the first allocation.
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
Params: map[string]string{"reschedule": "true"},
WaitIndex: qm.LastIndex,
})
must.NoError(t, err)
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
must.NoError(t, err)
must.True(t, alloc.DesiredTransition.ShouldReschedule(), must.Sprint("allocation should be marked for rescheduling"))
})

// Stop allocation that doesn't exist.
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
must.Error(t, err)
t.Run("no shutdown delay", func(t *testing.T) {
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
c.DevMode = true
})
defer s.Stop()
a := c.Allocations()

// wait for node
_ = oneNodeFromNodeList(t, c.Nodes())

// Create a job and register it
job := testJob()
_, wm, err := c.Jobs().Register(job, nil)
must.NoError(t, err)

// List allocations.
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
must.NoError(t, err)
must.SliceLen(t, 1, stubs)

// Stop the first allocation.
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
Params: map[string]string{"no_shutdown_delay": "true"},
WaitIndex: qm.LastIndex,
})
must.NoError(t, err)
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
must.NoError(t, err)
must.True(t, alloc.DesiredTransition.ShouldIgnoreShutdownDelay(), must.Sprint("allocation should be marked for no shutdown delay"))
})
}

// TestAllocations_ExecErrors ensures errors are properly formatted
Expand Down Expand Up @@ -496,6 +560,38 @@ func TestAllocations_ShouldMigrate(t *testing.T) {
must.False(t, DesiredTransition{Migrate: pointerOf(false)}.ShouldMigrate())
}

func TestAllocations_ShouldReschedule(t *testing.T) {
testutil.Parallel(t)

must.True(t, DesiredTransition{Reschedule: pointerOf(true)}.ShouldReschedule())
must.False(t, DesiredTransition{}.ShouldReschedule())
must.False(t, DesiredTransition{Reschedule: pointerOf(false)}.ShouldReschedule())
}

func TestAllocations_ShouldForceReschedule(t *testing.T) {
testutil.Parallel(t)

must.True(t, DesiredTransition{ForceReschedule: pointerOf(true)}.ShouldForceReschedule())
must.False(t, DesiredTransition{}.ShouldForceReschedule())
must.False(t, DesiredTransition{ForceReschedule: pointerOf(false)}.ShouldForceReschedule())
}

func TestAllocations_ShouldIgnoreShutdownDelay(t *testing.T) {
testutil.Parallel(t)

must.True(t, DesiredTransition{NoShutdownDelay: pointerOf(true)}.ShouldIgnoreShutdownDelay())
must.False(t, DesiredTransition{}.ShouldIgnoreShutdownDelay())
must.False(t, DesiredTransition{NoShutdownDelay: pointerOf(false)}.ShouldIgnoreShutdownDelay())
}

func TestAllocations_ShouldDisableMigrationPlacement(t *testing.T) {
testutil.Parallel(t)

must.True(t, DesiredTransition{MigrateDisablePlacement: pointerOf(true)}.ShouldDisableMigrationPlacement())
must.False(t, DesiredTransition{}.ShouldDisableMigrationPlacement())
must.False(t, DesiredTransition{MigrateDisablePlacement: pointerOf(false)}.ShouldDisableMigrationPlacement())
}

func TestAllocations_Services(t *testing.T) {
t.Skip("needs to be implemented")
// TODO(jrasell) add tests once registration process is in place.
Expand Down
17 changes: 1 addition & 16 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,23 +737,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
return nil
}

te := structs.NewTaskEvent(structs.TaskKilling).
return structs.NewTaskEvent(structs.TaskKilling).
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)

// if the task is not set failed, the job type is batch, and the
// allocation is being migrated then mark the task as failed. this
// ensures the task is recreated if no eligible nodes are immediately
// available.
if !tr.TaskState().Failed &&
ar.alloc.Job.Type == structs.JobTypeBatch &&
ar.alloc.DesiredTransition.Migrate != nil &&
*ar.alloc.DesiredTransition.Migrate {

ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name)
te.SetFailsTask()
}

return te
}

// Kill leader first, synchronously
Expand Down
Loading