Skip to content

Commit 384a984

Browse files
committed
[scheduler] fix scheduling behavior of batch job allocs
Allocations of batch jobs have a few defined behaviors documented which do not work as expected: First, on node drain, the allocation is allowed to complete unless the deadline is reached at which point the allocation is killed. The allocation is note replaced. Second, when using the `alloc stop` command, the allocation is stopped and then rescheduled according to its reschedule policy. Third, on job restart if the `-reschedule` flag is used the allocation will be migrated and its reschedule policy will be ignored. This update removes the change introduced in dfa07e1 (#26025) that forced batch job allocations into a failed state when migrating. The reported issue it was attempting to resolve was itself incorrect behavior. The reconciler has been adjusted to properly handle batch job allocations as documented.
1 parent c2e6ef8 commit 384a984

22 files changed

+1199
-290
lines changed

.changelog/26961.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
scheduler: Fixed scheduling behavior of batch job allocations
3+
```

api/allocations.go

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -185,24 +185,10 @@ func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error
185185
// Note: for cluster topologies where API consumers don't have network access to
186186
// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid
187187
// long pauses on this API call.
188-
//
189-
// BREAKING: This method will have the following signature in 1.6.0
190-
// func (a *Allocations) Stop(allocID string, w *WriteOptions) (*AllocStopResponse, error) {
191188
func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) {
192-
// COMPAT: Remove in 1.6.0
193-
var w *WriteOptions
194-
if q != nil {
195-
w = &WriteOptions{
196-
Region: q.Region,
197-
Namespace: q.Namespace,
198-
AuthToken: q.AuthToken,
199-
Headers: q.Headers,
200-
ctx: q.ctx,
201-
}
202-
}
203-
204189
var resp AllocStopResponse
205-
wm, err := a.client.put("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, w)
190+
191+
wm, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q)
206192
if wm != nil {
207193
resp.LastIndex = wm.LastIndex
208194
resp.RequestTime = wm.RequestTime
@@ -590,13 +576,46 @@ type DesiredTransition struct {
590576
// Reschedule is used to indicate that this allocation is eligible to be
591577
// rescheduled.
592578
Reschedule *bool
579+
580+
// ForceReschedule is used to indicate that this allocation must be
581+
// rescheduled.
582+
ForceReschedule *bool
583+
584+
// NoShutdownDelay is used to indicate any configured shutdown delay
585+
// should be ignored.
586+
NoShutdownDelay *bool
587+
588+
// MigrateDisablePlacement is used to indicate that this allocation
589+
// should not be placed during migration.
590+
MigrateDisablePlacement *bool
593591
}
594592

595593
// ShouldMigrate returns whether the transition object dictates a migration.
596594
func (d DesiredTransition) ShouldMigrate() bool {
597595
return d.Migrate != nil && *d.Migrate
598596
}
599597

598+
// ShouldReschedule returns whether the transition object dictates a reschedule.
599+
func (d DesiredTransition) ShouldReschedule() bool {
600+
return d.Reschedule != nil && *d.Reschedule
601+
}
602+
603+
// ShouldIgnoreShutdownDelay returns whether the transition object dictates
604+
// ignoring the configured shutdown delay.
605+
func (d DesiredTransition) ShouldIgnoreShutdownDelay() bool {
606+
return d.NoShutdownDelay != nil && *d.NoShutdownDelay
607+
}
608+
609+
// ShouldForceReschedule returns whether the transition object dictates a forced reschedule.
610+
func (d DesiredTransition) ShouldForceReschedule() bool {
611+
return d.ForceReschedule != nil && *d.ForceReschedule
612+
}
613+
614+
// ShouldDisableMigrationPlacement returns whether the transition object dictates placement during migration
615+
func (d DesiredTransition) ShouldDisableMigrationPlacement() bool {
616+
return d.MigrateDisablePlacement != nil && *d.MigrateDisablePlacement
617+
}
618+
600619
// ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively)
601620
type ExecStreamingIOOperation struct {
602621
Data []byte `json:"data,omitempty"`

api/allocations_test.go

Lines changed: 121 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -319,34 +319,100 @@ func TestAllocations_Stop(t *testing.T) {
319319
testutil.RequireRoot(t)
320320
testutil.Parallel(t)
321321

322-
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
323-
c.DevMode = true
322+
t.Run("default", func(t *testing.T) {
323+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
324+
c.DevMode = true
325+
})
326+
defer s.Stop()
327+
a := c.Allocations()
328+
329+
// wait for node
330+
_ = oneNodeFromNodeList(t, c.Nodes())
331+
332+
// Create a job and register it
333+
job := testJob()
334+
_, wm, err := c.Jobs().Register(job, nil)
335+
must.NoError(t, err)
336+
337+
// List allocations.
338+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
339+
must.NoError(t, err)
340+
must.SliceLen(t, 1, stubs)
341+
342+
// Stop the first allocation.
343+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
344+
must.NoError(t, err)
345+
test.UUIDv4(t, resp.EvalID)
346+
test.NonZero(t, resp.LastIndex)
347+
348+
// Stop allocation that doesn't exist.
349+
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
350+
must.Error(t, err)
324351
})
325-
defer s.Stop()
326-
a := c.Allocations()
327352

328-
// wait for node
329-
_ = oneNodeFromNodeList(t, c.Nodes())
330-
331-
// Create a job and register it
332-
job := testJob()
333-
_, wm, err := c.Jobs().Register(job, nil)
334-
must.NoError(t, err)
353+
t.Run("rescheduled", func(t *testing.T) {
354+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
355+
c.DevMode = true
356+
})
357+
defer s.Stop()
358+
a := c.Allocations()
359+
360+
// wait for node
361+
_ = oneNodeFromNodeList(t, c.Nodes())
362+
363+
// Create a job and register it
364+
job := testJob()
365+
_, wm, err := c.Jobs().Register(job, nil)
366+
must.NoError(t, err)
367+
368+
// List allocations.
369+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
370+
must.NoError(t, err)
371+
must.SliceLen(t, 1, stubs)
372+
373+
// Stop the first allocation.
374+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
375+
Params: map[string]string{"reschedule": "true"},
376+
WaitIndex: qm.LastIndex,
377+
})
378+
must.NoError(t, err)
335379

336-
// List allocations.
337-
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
338-
must.NoError(t, err)
339-
must.SliceLen(t, 1, stubs)
380+
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
381+
must.NoError(t, err)
382+
must.True(t, alloc.DesiredTransition.ShouldReschedule(), must.Sprint("allocation should be marked for rescheduling"))
383+
})
340384

341-
// Stop the first allocation.
342-
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex})
343-
must.NoError(t, err)
344-
test.UUIDv4(t, resp.EvalID)
345-
test.NonZero(t, resp.LastIndex)
385+
t.Run("no shutdown delay", func(t *testing.T) {
386+
c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) {
387+
c.DevMode = true
388+
})
389+
defer s.Stop()
390+
a := c.Allocations()
391+
392+
// wait for node
393+
_ = oneNodeFromNodeList(t, c.Nodes())
394+
395+
// Create a job and register it
396+
job := testJob()
397+
_, wm, err := c.Jobs().Register(job, nil)
398+
must.NoError(t, err)
399+
400+
// List allocations.
401+
stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex})
402+
must.NoError(t, err)
403+
must.SliceLen(t, 1, stubs)
404+
405+
// Stop the first allocation.
406+
resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{
407+
Params: map[string]string{"no_shutdown_delay": "true"},
408+
WaitIndex: qm.LastIndex,
409+
})
410+
must.NoError(t, err)
346411

347-
// Stop allocation that doesn't exist.
348-
resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex})
349-
must.Error(t, err)
412+
alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex})
413+
must.NoError(t, err)
414+
must.True(t, alloc.DesiredTransition.ShouldIgnoreShutdownDelay(), must.Sprint("allocation should be marked for no shutdown delay"))
415+
})
350416
}
351417

352418
// TestAllocations_ExecErrors ensures errors are properly formatted
@@ -480,6 +546,38 @@ func TestAllocations_ShouldMigrate(t *testing.T) {
480546
must.False(t, DesiredTransition{Migrate: pointerOf(false)}.ShouldMigrate())
481547
}
482548

549+
func TestAllocations_ShouldReschedule(t *testing.T) {
550+
testutil.Parallel(t)
551+
552+
must.True(t, DesiredTransition{Reschedule: pointerOf(true)}.ShouldReschedule())
553+
must.False(t, DesiredTransition{}.ShouldReschedule())
554+
must.False(t, DesiredTransition{Reschedule: pointerOf(false)}.ShouldReschedule())
555+
}
556+
557+
func TestAllocations_ShouldForceReschedule(t *testing.T) {
558+
testutil.Parallel(t)
559+
560+
must.True(t, DesiredTransition{ForceReschedule: pointerOf(true)}.ShouldForceReschedule())
561+
must.False(t, DesiredTransition{}.ShouldForceReschedule())
562+
must.False(t, DesiredTransition{ForceReschedule: pointerOf(false)}.ShouldForceReschedule())
563+
}
564+
565+
func TestAllocations_ShouldIgnoreShutdownDelay(t *testing.T) {
566+
testutil.Parallel(t)
567+
568+
must.True(t, DesiredTransition{NoShutdownDelay: pointerOf(true)}.ShouldIgnoreShutdownDelay())
569+
must.False(t, DesiredTransition{}.ShouldIgnoreShutdownDelay())
570+
must.False(t, DesiredTransition{NoShutdownDelay: pointerOf(false)}.ShouldIgnoreShutdownDelay())
571+
}
572+
573+
func TestAllocations_ShouldDisableMigrationPlacement(t *testing.T) {
574+
testutil.Parallel(t)
575+
576+
must.True(t, DesiredTransition{MigrateDisablePlacement: pointerOf(true)}.ShouldDisableMigrationPlacement())
577+
must.False(t, DesiredTransition{}.ShouldDisableMigrationPlacement())
578+
must.False(t, DesiredTransition{MigrateDisablePlacement: pointerOf(false)}.ShouldDisableMigrationPlacement())
579+
}
580+
483581
func TestAllocations_Services(t *testing.T) {
484582
t.Skip("needs to be implemented")
485583
// TODO(jrasell) add tests once registration process is in place.

client/allocrunner/alloc_runner.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -737,23 +737,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
737737
return nil
738738
}
739739

740-
te := structs.NewTaskEvent(structs.TaskKilling).
740+
return structs.NewTaskEvent(structs.TaskKilling).
741741
SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout)
742-
743-
// if the task is not set failed, the job type is batch, and the
744-
// allocation is being migrated then mark the task as failed. this
745-
// ensures the task is recreated if no eligible nodes are immediately
746-
// available.
747-
if !tr.TaskState().Failed &&
748-
ar.alloc.Job.Type == structs.JobTypeBatch &&
749-
ar.alloc.DesiredTransition.Migrate != nil &&
750-
*ar.alloc.DesiredTransition.Migrate {
751-
752-
ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name)
753-
te.SetFailsTask()
754-
}
755-
756-
return te
757742
}
758743

759744
// Kill leader first, synchronously

0 commit comments

Comments
 (0)