Skip to content

Commit cf09234

Browse files
committed
[scheduler] fix scheduling behavior of batch job allocs
Allocations of batch jobs have a few defined behaviors documented which do not work as expected: First, on node drain, the allocation is allowed to complete unless the deadline is reached at which point the allocation is killed. The allocation is note replaced. Second, when using the `alloc stop` command, the allocation is stopped and then rescheduled according to its reschedule policy. Third, on job restart if the `-reschedule` flag is used the allocation will be migrated and its reschedule policy will be ignored. This update removes the change introduced in dfa07e1 (#26025) that forced batch job allocations into a failed state when migrating. The reported issue it was attempting to resolve was itself incorrect behavior. The reconciler has been adjusted to properly handle batch job allocations as documented.
1 parent 3d14572 commit cf09234

19 files changed

+1051
-386
lines changed

api/allocations.go

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -185,24 +185,9 @@ func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error
185185
// Note: for cluster topologies where API consumers don't have network access to
186186
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
187187
// long pauses on this API call.
188-
//
189-
// BREAKING: This method will have the following signature in 1.6.0
190-
// func (a *Allocations) Stop(allocID string, w *WriteOptions) (*AllocStopResponse, error) {
191188
func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) {
192-
// COMPAT: Remove in 1.6.0
193-
var w *WriteOptions
194-
if q != nil {
195-
w = &WriteOptions{
196-
Region: q.Region,
197-
Namespace: q.Namespace,
198-
AuthToken: q.AuthToken,
199-
Headers: q.Headers,
200-
ctx: q.ctx,
201-
}
202-
}
203-
204189
var resp AllocStopResponse
205-
wm, err := a.client.put("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, w)
190+
wm, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q)
206191
if wm != nil {
207192
resp.LastIndex = wm.LastIndex
208193
resp.RequestTime = wm.RequestTime
@@ -590,13 +575,46 @@ type DesiredTransition struct {
590575
// Reschedule is used to indicate that this allocation is eligible to be
591576
// rescheduled.
592577
Reschedule *bool
578+
579+
// ForceReschedule is used to indicate that this allocation must be
580+
// rescheduled.
581+
ForceReschedule *bool
582+
583+
// NoShutdownDelay is used to indicate any configured shutdown delay
584+
// should be ignored.
585+
NoShutdownDelay *bool
586+
587+
// MigrateDisablePlacement is used to indicate that this allocation
588+
// should not be placed during migration.
589+
MigrateDisablePlacement *bool
593590
}
594591

595592
// ShouldMigrate returns whether the transition object dictates a migration.
596593
func (d DesiredTransition) ShouldMigrate() bool {
597594
return d.Migrate != nil && *d.Migrate
598595
}
599596

597+
// ShouldReschedule returns whether the transition object dictates a reschedule.
598+
func (d DesiredTransition) ShouldReschedule() bool {
599+
return d.Reschedule != nil && *d.Reschedule
600+
}
601+
602+
// ShouldIgnoreShutdownDelay returns whether the transition object dictates
603+
// ignoring the configured shutdown delay.
604+
func (d DesiredTransition) ShouldIgnoreShutdownDelay() bool {
605+
return d.NoShutdownDelay != nil && *d.NoShutdownDelay
606+
}
607+
608+
// ShouldForceReschedule returns whether the transition object dictates a forced reschedule.
609+
func (d DesiredTransition) ShouldForceReschedule() bool {
610+
return d.ForceReschedule != nil && *d.ForceReschedule
611+
}
612+
613+
// ShouldDisableMigrationPlacement returns whether the transition object dictates placement during migration
614+
func (d DesiredTransition) ShouldDisableMigrationPlacement() bool {
615+
return d.MigrateDisablePlacement != nil && *d.MigrateDisablePlacement
616+
}
617+
600618
// ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively)
601619
type ExecStreamingIOOperation struct {
602620
Data []byte `json:"data,omitempty"`

api/allocations_test.go

Lines changed: 121 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -335,34 +335,98 @@ func TestAllocations_Stop(t *testing.T) {
335335
testutil.RequireRoot(t)
336336
testutil.Parallel(t)
337337

338-
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
339-
c.DevMode = true
338+
t.Run("default", func(t *testing.T) {
339+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
340+
c.DevMode = true
341+
})
342+
defer s.Stop()
343+
a := c.Allocations()
344+
345+
// wait for node
346+
_ = oneNodeFromNodeList(t, c.Nodes())
347+
348+
// Create a job and register it
349+
job := testJob()
350+
_, wm, err := c.Jobs().Register(job, nil)
351+
must.NoError(t, err)
352+
353+
// List allocations.
354+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
355+
must.NoError(t, err)
356+
must.SliceLen(t, 1, stubs)
357+
358+
// Stop the first allocation.
359+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
360+
must.NoError(t, err)
361+
test.UUIDv4(t, resp.EvalID)
362+
test.NonZero(t, resp.LastIndex)
363+
364+
// Stop allocation that doesn't exist.
365+
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
366+
must.Error(t, err)
340367
})
341-
defer s.Stop()
342-
a := c.Allocations()
343368

344-
// wait for node
345-
_ = oneNodeFromNodeList(t, c.Nodes())
346-
347-
// Create a job and register it
348-
job := testJob()
349-
_, wm, err := c.Jobs().Register(job, nil)
350-
must.NoError(t, err)
351-
352-
// List allocations.
353-
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
354-
must.NoError(t, err)
355-
must.SliceLen(t, 1, stubs)
356-
357-
// Stop the first allocation.
358-
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
359-
must.NoError(t, err)
360-
test.UUIDv4(t, resp.EvalID)
361-
test.NonZero(t, resp.LastIndex)
369+
t.Run("rescheduled", func(t *testing.T) {
370+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
371+
c.DevMode = true
372+
})
373+
defer s.Stop()
374+
a := c.Allocations()
375+
376+
// wait for node
377+
_ = oneNodeFromNodeList(t, c.Nodes())
378+
379+
// Create a job and register it
380+
job := testJob()
381+
_, wm, err := c.Jobs().Register(job, nil)
382+
must.NoError(t, err)
383+
384+
// List allocations.
385+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
386+
must.NoError(t, err)
387+
must.SliceLen(t, 1, stubs)
388+
389+
// Stop the first allocation.
390+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
391+
Params: map[string]string{"reschedule": "true"},
392+
WaitIndex: qm.LastIndex,
393+
})
394+
must.NoError(t, err)
395+
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
396+
must.NoError(t, err)
397+
must.True(t, alloc.DesiredTransition.ShouldReschedule(), must.Sprint("allocation should be marked for rescheduling"))
398+
})
362399

363-
// Stop allocation that doesn't exist.
364-
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
365-
must.Error(t, err)
400+
t.Run("no shutdown delay", func(t *testing.T) {
401+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
402+
c.DevMode = true
403+
})
404+
defer s.Stop()
405+
a := c.Allocations()
406+
407+
// wait for node
408+
_ = oneNodeFromNodeList(t, c.Nodes())
409+
410+
// Create a job and register it
411+
job := testJob()
412+
_, wm, err := c.Jobs().Register(job, nil)
413+
must.NoError(t, err)
414+
415+
// List allocations.
416+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
417+
must.NoError(t, err)
418+
must.SliceLen(t, 1, stubs)
419+
420+
// Stop the first allocation.
421+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
422+
Params: map[string]string{"no_shutdown_delay": "true"},
423+
WaitIndex: qm.LastIndex,
424+
})
425+
must.NoError(t, err)
426+
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
427+
must.NoError(t, err)
428+
must.True(t, alloc.DesiredTransition.ShouldIgnoreShutdownDelay(), must.Sprint("allocation should be marked for no shutdown delay"))
429+
})
366430
}
367431

368432
// TestAllocations_ExecErrors ensures errors are properly formatted
@@ -496,6 +560,38 @@ func TestAllocations_ShouldMigrate(t *testing.T) {
496560
must.False(t, DesiredTransition{Migrate: pointerOf(false)}.ShouldMigrate())
497561
}
498562

563+
func TestAllocations_ShouldReschedule(t *testing.T) {
564+
testutil.Parallel(t)
565+
566+
must.True(t, DesiredTransition{Reschedule: pointerOf(true)}.ShouldReschedule())
567+
must.False(t, DesiredTransition{}.ShouldReschedule())
568+
must.False(t, DesiredTransition{Reschedule: pointerOf(false)}.ShouldReschedule())
569+
}
570+
571+
func TestAllocations_ShouldForceReschedule(t *testing.T) {
572+
testutil.Parallel(t)
573+
574+
must.True(t, DesiredTransition{ForceReschedule: pointerOf(true)}.ShouldForceReschedule())
575+
must.False(t, DesiredTransition{}.ShouldForceReschedule())
576+
must.False(t, DesiredTransition{ForceReschedule: pointerOf(false)}.ShouldForceReschedule())
577+
}
578+
579+
func TestAllocations_ShouldIgnoreShutdownDelay(t *testing.T) {
580+
testutil.Parallel(t)
581+
582+
must.True(t, DesiredTransition{NoShutdownDelay: pointerOf(true)}.ShouldIgnoreShutdownDelay())
583+
must.False(t, DesiredTransition{}.ShouldIgnoreShutdownDelay())
584+
must.False(t, DesiredTransition{NoShutdownDelay: pointerOf(false)}.ShouldIgnoreShutdownDelay())
585+
}
586+
587+
func TestAllocations_ShouldDisableMigrationPlacement(t *testing.T) {
588+
testutil.Parallel(t)
589+
590+
must.True(t, DesiredTransition{MigrateDisablePlacement: pointerOf(true)}.ShouldDisableMigrationPlacement())
591+
must.False(t, DesiredTransition{}.ShouldDisableMigrationPlacement())
592+
must.False(t, DesiredTransition{MigrateDisablePlacement: pointerOf(false)}.ShouldDisableMigrationPlacement())
593+
}
594+
499595
func TestAllocations_Services(t *testing.T) {
500596
t.Skip("needs to be implemented")
501597
// TODO(jrasell) add tests once registration process is in place.

client/allocrunner/alloc_runner.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -737,23 +737,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
737737
return nil
738738
}
739739

740-
te := structs.NewTaskEvent(structs.TaskKilling).
740+
return structs.NewTaskEvent(structs.TaskKilling).
741741
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
742-
743-
// if the task is not set failed, the job type is batch, and the
744-
// allocation is being migrated then mark the task as failed. this
745-
// ensures the task is recreated if no eligible nodes are immediately
746-
// available.
747-
if !tr.TaskState().Failed &&
748-
ar.alloc.Job.Type == structs.JobTypeBatch &&
749-
ar.alloc.DesiredTransition.Migrate != nil &&
750-
*ar.alloc.DesiredTransition.Migrate {
751-
752-
ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name)
753-
te.SetFailsTask()
754-
}
755-
756-
return te
757742
}
758743

759744
// Kill leader first, synchronously

0 commit comments

Comments
 (0)