diff --git a/.changelog/26961.txt b/.changelog/26961.txt new file mode 100644 index 00000000000..119944f8a2a --- /dev/null +++ b/.changelog/26961.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed scheduling behavior of batch job allocations +``` diff --git a/api/allocations.go b/api/allocations.go index e4c95d6d904..2549cfb0c2d 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -185,24 +185,9 @@ func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid // long pauses on this API call. -// -// BREAKING: This method will have the following signature in 1.6.0 -// func (a *Allocations) Stop(allocID string, w *WriteOptions) (*AllocStopResponse, error) { func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) { - // COMPAT: Remove in 1.6.0 - var w *WriteOptions - if q != nil { - w = &WriteOptions{ - Region: q.Region, - Namespace: q.Namespace, - AuthToken: q.AuthToken, - Headers: q.Headers, - ctx: q.ctx, - } - } - var resp AllocStopResponse - wm, err := a.client.put("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, w) + wm, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q) if wm != nil { resp.LastIndex = wm.LastIndex resp.RequestTime = wm.RequestTime @@ -590,6 +575,18 @@ type DesiredTransition struct { // Reschedule is used to indicate that this allocation is eligible to be // rescheduled. Reschedule *bool + + // ForceReschedule is used to indicate that this allocation must be + // rescheduled. + ForceReschedule *bool + + // NoShutdownDelay is used to indicate any configured shutdown delay + // should be ignored. + NoShutdownDelay *bool + + // MigrateDisablePlacement is used to indicate that this allocation + // should not be placed during migration. + MigrateDisablePlacement *bool } // ShouldMigrate returns whether the transition object dictates a migration. @@ -597,6 +594,27 @@ func (d DesiredTransition) ShouldMigrate() bool { return d.Migrate != nil && *d.Migrate } +// ShouldReschedule returns whether the transition object dictates a reschedule. +func (d DesiredTransition) ShouldReschedule() bool { + return d.Reschedule != nil && *d.Reschedule +} + +// ShouldIgnoreShutdownDelay returns whether the transition object dictates +// ignoring the configured shutdown delay. +func (d DesiredTransition) ShouldIgnoreShutdownDelay() bool { + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + +// ShouldForceReschedule returns whether the transition object dictates a forced reschedule. +func (d DesiredTransition) ShouldForceReschedule() bool { + return d.ForceReschedule != nil && *d.ForceReschedule +} + +// ShouldDisableMigrationPlacement returns whether the transition object dictates placement during migration +func (d DesiredTransition) ShouldDisableMigrationPlacement() bool { + return d.MigrateDisablePlacement != nil && *d.MigrateDisablePlacement +} + // ExecStreamingIOOperation represents a stream write operation: either appending data or close (exclusively) type ExecStreamingIOOperation struct { Data []byte `json:"data,omitempty"` diff --git a/api/allocations_test.go b/api/allocations_test.go index e3c3a136e61..534711d4916 100644 --- a/api/allocations_test.go +++ b/api/allocations_test.go @@ -335,34 +335,98 @@ func TestAllocations_Stop(t *testing.T) { testutil.RequireRoot(t) testutil.Parallel(t) - c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { - c.DevMode = true + t.Run("default", func(t *testing.T) { + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + c.DevMode = true + }) + defer s.Stop() + a := c.Allocations() + + // wait for node + _ = oneNodeFromNodeList(t, c.Nodes()) + + // Create a job and register it + job := testJob() + _, wm, err := c.Jobs().Register(job, nil) + must.NoError(t, err) + + // List allocations. + stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex}) + must.NoError(t, err) + must.SliceLen(t, 1, stubs) + + // Stop the first allocation. + resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex}) + must.NoError(t, err) + test.UUIDv4(t, resp.EvalID) + test.NonZero(t, resp.LastIndex) + + // Stop allocation that doesn't exist. + resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex}) + must.Error(t, err) }) - defer s.Stop() - a := c.Allocations() - // wait for node - _ = oneNodeFromNodeList(t, c.Nodes()) - - // Create a job and register it - job := testJob() - _, wm, err := c.Jobs().Register(job, nil) - must.NoError(t, err) - - // List allocations. - stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex}) - must.NoError(t, err) - must.SliceLen(t, 1, stubs) - - // Stop the first allocation. - resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{WaitIndex: qm.LastIndex}) - must.NoError(t, err) - test.UUIDv4(t, resp.EvalID) - test.NonZero(t, resp.LastIndex) + t.Run("rescheduled", func(t *testing.T) { + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + c.DevMode = true + }) + defer s.Stop() + a := c.Allocations() + + // wait for node + _ = oneNodeFromNodeList(t, c.Nodes()) + + // Create a job and register it + job := testJob() + _, wm, err := c.Jobs().Register(job, nil) + must.NoError(t, err) + + // List allocations. + stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex}) + must.NoError(t, err) + must.SliceLen(t, 1, stubs) + + // Stop the first allocation. + resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{ + Params: map[string]string{"reschedule": "true"}, + WaitIndex: qm.LastIndex, + }) + must.NoError(t, err) + alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex}) + must.NoError(t, err) + must.True(t, alloc.DesiredTransition.ShouldReschedule(), must.Sprint("allocation should be marked for rescheduling")) + }) - // Stop allocation that doesn't exist. - resp, err = a.Stop(&Allocation{ID: "invalid"}, &QueryOptions{WaitIndex: qm.LastIndex}) - must.Error(t, err) + t.Run("no shutdown delay", func(t *testing.T) { + c, s := makeClient(t, nil, func(c *testutil.TestServerConfig) { + c.DevMode = true + }) + defer s.Stop() + a := c.Allocations() + + // wait for node + _ = oneNodeFromNodeList(t, c.Nodes()) + + // Create a job and register it + job := testJob() + _, wm, err := c.Jobs().Register(job, nil) + must.NoError(t, err) + + // List allocations. + stubs, qm, err := a.List(&QueryOptions{WaitIndex: wm.LastIndex}) + must.NoError(t, err) + must.SliceLen(t, 1, stubs) + + // Stop the first allocation. + resp, err := a.Stop(&Allocation{ID: stubs[0].ID}, &QueryOptions{ + Params: map[string]string{"no_shutdown_delay": "true"}, + WaitIndex: qm.LastIndex, + }) + must.NoError(t, err) + alloc, _, err := a.Info(stubs[0].ID, &QueryOptions{WaitIndex: resp.LastIndex}) + must.NoError(t, err) + must.True(t, alloc.DesiredTransition.ShouldIgnoreShutdownDelay(), must.Sprint("allocation should be marked for no shutdown delay")) + }) } // TestAllocations_ExecErrors ensures errors are properly formatted @@ -496,6 +560,38 @@ func TestAllocations_ShouldMigrate(t *testing.T) { must.False(t, DesiredTransition{Migrate: pointerOf(false)}.ShouldMigrate()) } +func TestAllocations_ShouldReschedule(t *testing.T) { + testutil.Parallel(t) + + must.True(t, DesiredTransition{Reschedule: pointerOf(true)}.ShouldReschedule()) + must.False(t, DesiredTransition{}.ShouldReschedule()) + must.False(t, DesiredTransition{Reschedule: pointerOf(false)}.ShouldReschedule()) +} + +func TestAllocations_ShouldForceReschedule(t *testing.T) { + testutil.Parallel(t) + + must.True(t, DesiredTransition{ForceReschedule: pointerOf(true)}.ShouldForceReschedule()) + must.False(t, DesiredTransition{}.ShouldForceReschedule()) + must.False(t, DesiredTransition{ForceReschedule: pointerOf(false)}.ShouldForceReschedule()) +} + +func TestAllocations_ShouldIgnoreShutdownDelay(t *testing.T) { + testutil.Parallel(t) + + must.True(t, DesiredTransition{NoShutdownDelay: pointerOf(true)}.ShouldIgnoreShutdownDelay()) + must.False(t, DesiredTransition{}.ShouldIgnoreShutdownDelay()) + must.False(t, DesiredTransition{NoShutdownDelay: pointerOf(false)}.ShouldIgnoreShutdownDelay()) +} + +func TestAllocations_ShouldDisableMigrationPlacement(t *testing.T) { + testutil.Parallel(t) + + must.True(t, DesiredTransition{MigrateDisablePlacement: pointerOf(true)}.ShouldDisableMigrationPlacement()) + must.False(t, DesiredTransition{}.ShouldDisableMigrationPlacement()) + must.False(t, DesiredTransition{MigrateDisablePlacement: pointerOf(false)}.ShouldDisableMigrationPlacement()) +} + func TestAllocations_Services(t *testing.T) { t.Skip("needs to be implemented") // TODO(jrasell) add tests once registration process is in place. diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index a53a889f5d3..6aa0d296177 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -737,23 +737,8 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { return nil } - te := structs.NewTaskEvent(structs.TaskKilling). + return structs.NewTaskEvent(structs.TaskKilling). SetKillTimeout(tr.Task().KillTimeout, ar.clientConfig.MaxKillTimeout) - - // if the task is not set failed, the job type is batch, and the - // allocation is being migrated then mark the task as failed. this - // ensures the task is recreated if no eligible nodes are immediately - // available. - if !tr.TaskState().Failed && - ar.alloc.Job.Type == structs.JobTypeBatch && - ar.alloc.DesiredTransition.Migrate != nil && - *ar.alloc.DesiredTransition.Migrate { - - ar.logger.Trace("marking migrating batch job task failed on kill", "task_name", tr.Task().Name) - te.SetFailsTask() - } - - return te } // Kill leader first, synchronously diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 87b381ddb69..73e890fe0b7 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1843,160 +1843,6 @@ func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { require.True(t, state.TaskStates["bad"].Failed) } -// Test that alloc runner kills tasks in task group when stopping and -// fails tasks when job is batch job type and migrating -func TestAllocRunner_Migrate_Batch_KillTG(t *testing.T) { - ci.Parallel(t) - - alloc := mock.BatchAlloc() - tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] - alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 - alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 - - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config["run_for"] = "10s" - alloc.AllocatedResources.Tasks[task.Name] = tr - - task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() - task2.Name = "task 2" - task2.Driver = "mock_driver" - task2.Config["run_for"] = "1ms" - alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) - alloc.AllocatedResources.Tasks[task2.Name] = tr - - conf, cleanup := testAllocRunnerConfig(t, alloc) - defer cleanup() - ar, err := NewAllocRunner(conf) - must.NoError(t, err) - - defer destroy(ar) - go ar.Run() - upd := conf.StateUpdater.(*MockStateUpdater) - - // Wait for running - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - if last.ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) - } - return true, nil - }, func(err error) { - must.NoError(t, err) - }) - - // Wait for completed task - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - if last.ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) - } - - // task should not have finished yet, task2 should be finished - if !last.TaskStates[task.Name].FinishedAt.IsZero() { - return false, fmt.Errorf("task should not be finished") - } - if last.TaskStates[task2.Name].FinishedAt.IsZero() { - return false, fmt.Errorf("task should be finished") - } - return true, nil - }, func(err error) { - must.NoError(t, err) - }) - - update := ar.Alloc().Copy() - migrate := true - update.DesiredTransition.Migrate = &migrate - update.DesiredStatus = structs.AllocDesiredStatusStop - ar.Update(update) - - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - - if last.ClientStatus != structs.AllocClientStatusFailed { - return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusFailed) - } - - // task should be failed since it was killed, task2 should not - // be failed since it was already completed - if !last.TaskStates[task.Name].Failed { - return false, fmt.Errorf("task should be failed") - } - if last.TaskStates[task2.Name].Failed { - return false, fmt.Errorf("task should not be failed") - } - return true, nil - }, func(err error) { - must.NoError(t, err) - }) -} - -// Test that alloc runner kills tasks in task group when stopping and -// does not fail tasks when job is batch job type and not migrating -func TestAllocRunner_Batch_KillTG(t *testing.T) { - ci.Parallel(t) - - alloc := mock.BatchAlloc() - tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] - alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 - alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 - - task := alloc.Job.TaskGroups[0].Tasks[0] - task.Driver = "mock_driver" - task.Config["run_for"] = "10s" - alloc.AllocatedResources.Tasks[task.Name] = tr - - conf, cleanup := testAllocRunnerConfig(t, alloc) - defer cleanup() - ar, err := NewAllocRunner(conf) - must.NoError(t, err) - - defer destroy(ar) - go ar.Run() - upd := conf.StateUpdater.(*MockStateUpdater) - - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - if last.ClientStatus != structs.AllocClientStatusRunning { - return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) - } - return true, nil - }, func(err error) { - must.NoError(t, err) - }) - - update := ar.Alloc().Copy() - update.DesiredStatus = structs.AllocDesiredStatusStop - ar.Update(update) - - testutil.WaitForResult(func() (bool, error) { - last := upd.Last() - if last == nil { - return false, fmt.Errorf("No updates") - } - - if last.ClientStatus != structs.AllocClientStatusComplete { - return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusComplete) - } - - return true, nil - }, func(err error) { - must.NoError(t, err) - }) -} - // Test that alloc runner kills tasks in task group when stopping and // fails tasks when job is batch job type and migrating func TestAllocRunner_KillTG_DeadTasks(t *testing.T) { @@ -2076,19 +1922,6 @@ func TestAllocRunner_KillTG_DeadTasks(t *testing.T) { return false, fmt.Errorf("No updates") } - if last.ClientStatus != structs.AllocClientStatusFailed { - return false, fmt.Errorf("got client status %q; want %q", last.ClientStatus, structs.AllocClientStatusFailed) - } - - // task should be failed since it was killed, task2 should not - // be failed since it was already completed - if !last.TaskStates[task.Name].Failed { - return false, fmt.Errorf("task should be failed") - } - if last.TaskStates[task2.Name].Failed { - return false, fmt.Errorf("task should not be failed") - } - taskEvtSize := len(last.TaskStates[task.Name].Events) task2EvtSize := len(last.TaskStates[task2.Name].Events) diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index b7fc9ad8900..0176172c73d 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -19,6 +19,7 @@ import ( "github.com/gorilla/websocket" "github.com/hashicorp/go-msgpack/v2/codec" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/drivers" ) @@ -147,18 +148,24 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } - noShutdownDelay := false - if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { - var err error - noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) - if err != nil { - return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) - } + noShutdownDelay, err := parseBool(req, "no_shutdown_delay") + if err != nil { + return nil, err + } else if noShutdownDelay == nil { + noShutdownDelay = pointer.Of(false) + } + + reschedule, err := parseBool(req, "reschedule") + if err != nil { + return nil, err + } else if reschedule == nil { + reschedule = pointer.Of(false) } sr := &structs.AllocStopRequest{ AllocID: allocID, - NoShutdownDelay: noShutdownDelay, + NoShutdownDelay: *noShutdownDelay, + Reschedule: *reschedule, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 98c3d9a0c27..44b61c5621c 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -125,9 +125,14 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - var opts *api.QueryOptions + opts := &api.QueryOptions{Params: map[string]string{}} + + if alloc.Stub().JobType == "batch" { + opts.Params["reschedule"] = "true" + } + if noShutdownDelay { - opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + opts.Params["no_shutdown_delay"] = "true" } resp, err := client.Allocations().Stop(alloc, opts) diff --git a/command/alloc_stop_test.go b/command/alloc_stop_test.go index 9923227b18c..adeaf886927 100644 --- a/command/alloc_stop_test.go +++ b/command/alloc_stop_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/hashicorp/cli" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" "github.com/shoenig/test/must" ) @@ -65,35 +66,111 @@ func TestAllocStop_Fails(t *testing.T) { func TestAllocStop_Run(t *testing.T) { ci.Parallel(t) - srv, client, url := testServer(t, true, nil) - defer srv.Shutdown() + t.Run("default", func(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() - // Wait for a node to be ready - waitForNodes(t, client) + // Wait for a node to be ready + waitForNodes(t, client) - ui := cli.NewMockUi() - cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + ui := cli.NewMockUi() + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} - jobID := "job1_sfx" - job1 := testJob(jobID) - resp, _, err := client.Jobs().Register(job1, nil) - must.NoError(t, err) + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + must.NoError(t, err) - code := waitForSuccess(ui, client, fullId, t, resp.EvalID) - must.Zero(t, code) + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) - // get an alloc id - allocID := "" - if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { - if len(allocs) > 0 { - allocID = allocs[0].ID + // get an alloc id + allocID := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocID = allocs[0].ID + } } - } - must.NotEq(t, "", allocID) + must.NotEq(t, "", allocID) + + // Wait for alloc to be running + waitForAllocRunning(t, client, allocID) + + code = cmd.Run([]string{"-address=" + url, allocID}) + must.Zero(t, code) + }) + + t.Run("batch job", func(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + // Wait for a node to be ready + waitForNodes(t, client) + ui := cli.NewMockUi() + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + must.NoError(t, err) + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + + // get an alloc id + allocID := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocID = allocs[0].ID + } + } + must.NotEq(t, "", allocID) + + // Wait for alloc to be running + waitForAllocRunning(t, client, allocID) + + code = cmd.Run([]string{"-address=" + url, allocID}) + must.Zero(t, code) - // Wait for alloc to be running - waitForAllocRunning(t, client, allocID) + chkAlloc, _, err := client.Allocations().Info(allocID, &api.QueryOptions{}) + must.NoError(t, err) + must.True(t, chkAlloc.DesiredTransition.ShouldMigrate(), must.Sprint("alloc should be flagged to migrate")) + // this is a batch job so alloc should be rescheduled + must.True(t, chkAlloc.DesiredTransition.ShouldReschedule(), must.Sprint("alloc should be flagged to reschedule")) + }) - code = cmd.Run([]string{"-address=" + url, allocID}) - must.Zero(t, code) + t.Run("no shutdown delay", func(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + // Wait for a node to be ready + waitForNodes(t, client) + + ui := cli.NewMockUi() + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + + // get an alloc id + allocID := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocID = allocs[0].ID + } + } + must.NotEq(t, "", allocID) + + // Wait for alloc to be running + waitForAllocRunning(t, client, allocID) + code = cmd.Run([]string{"-address=" + url, "-no-shutdown-delay", allocID}) + must.Zero(t, code) + chkAlloc, _, err := client.Allocations().Info(allocID, &api.QueryOptions{}) + must.NoError(t, err) + must.True(t, chkAlloc.DesiredTransition.ShouldMigrate(), must.Sprint("alloc should be flagged to migrate")) + must.True(t, chkAlloc.DesiredTransition.ShouldIgnoreShutdownDelay(), must.Sprint("alloc should be flagged to ignore shutdown delay")) + }) } diff --git a/command/job_restart_test.go b/command/job_restart_test.go index d035d315e19..ef2245c471a 100644 --- a/command/job_restart_test.go +++ b/command/job_restart_test.go @@ -1265,7 +1265,6 @@ namespace "default" { } } -// TODO(luiz): update once alloc restart supports -no-shutdown-delay. func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { ci.Parallel(t) @@ -1371,9 +1370,7 @@ func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { if tc.shutdownDelay { must.GreaterEq(t, shutdownDelay, time.Duration(diff)) } else { - // Add a bit of slack to account for the actual - // shutdown time of the task. - must.Between(t, shutdownDelay, time.Duration(diff), shutdownDelay+time.Second) + must.Less(t, shutdownDelay, time.Duration(diff)) } } } diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index c182268ae65..575f51e84f9 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -307,6 +307,7 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes args.AllocID: { Migrate: pointer.Of(true), NoShutdownDelay: pointer.Of(args.NoShutdownDelay), + Reschedule: pointer.Of(args.Reschedule), }, }, } diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index 20686aad3bc..6397f345c23 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -1125,6 +1125,92 @@ func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) { require.True(*out2.DesiredTransition.Migrate) } +func TestAllocEndpoint_Stop(t *testing.T) { + ci.Parallel(t) + + t.Run("default", func(t *testing.T) { + srv, cleanup := TestServer(t, nil) + defer cleanup() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + + alloc := mock.Alloc() + state := srv.fsm.State() + must.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) + req := &structs.AllocStopRequest{ + AllocID: alloc.ID, + WriteRequest: structs.WriteRequest{ + Namespace: structs.DefaultNamespace, + Region: alloc.Job.Region, + }, + } + var resp structs.AllocStopResponse + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp)) + + chkAlloc, err := state.AllocByID(nil, alloc.ID) + must.NoError(t, err) + + must.True(t, chkAlloc.DesiredTransition.ShouldMigrate()) + }) + + t.Run("with reschedule", func(t *testing.T) { + srv, cleanup := TestServer(t, nil) + defer cleanup() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + + alloc := mock.Alloc() + state := srv.fsm.State() + must.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) + req := &structs.AllocStopRequest{ + AllocID: alloc.ID, + Reschedule: true, + WriteRequest: structs.WriteRequest{ + Namespace: structs.DefaultNamespace, + Region: alloc.Job.Region, + }, + } + var resp structs.AllocStopResponse + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp)) + + chkAlloc, err := state.AllocByID(nil, alloc.ID) + must.NoError(t, err) + + must.True(t, chkAlloc.DesiredTransition.ShouldMigrate()) + must.True(t, chkAlloc.DesiredTransition.ShouldReschedule()) + }) + + t.Run("with no shutdown delay", func(t *testing.T) { + srv, cleanup := TestServer(t, nil) + defer cleanup() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + + alloc := mock.Alloc() + state := srv.fsm.State() + must.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) + must.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc})) + req := &structs.AllocStopRequest{ + AllocID: alloc.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Namespace: structs.DefaultNamespace, + Region: alloc.Job.Region, + }, + } + var resp structs.AllocStopResponse + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp)) + + chkAlloc, err := state.AllocByID(nil, alloc.ID) + must.NoError(t, err) + + must.True(t, chkAlloc.DesiredTransition.ShouldMigrate()) + must.True(t, chkAlloc.DesiredTransition.ShouldIgnoreShutdownDelay()) + }) +} + func TestAllocEndpoint_Stop_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index 3b93ae14f01..e91319a4b3b 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -401,6 +401,13 @@ func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs transitions[alloc.ID] = &structs.DesiredTransition{ Migrate: pointer.Of(true), } + + // When draining batch job allocations, the allocation should be + // be stopped. Setting this ensures the allocation is stopped in + // the migration process, but that a new allocation is not placed. + if alloc.Job.Type == structs.JobTypeBatch { + transitions[alloc.ID].MigrateDisablePlacement = pointer.Of(true) + } jobs[alloc.JobNamespacedID()] = alloc.Job } diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 02f4e314231..aedb60b3c22 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -395,7 +395,7 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { // Wait for allocs to be replaced finalAllocs := waitForAllocsStop(t, store, n1.ID, nil) - waitForPlacedAllocs(t, store, n2.ID, 5) + waitForPlacedAllocs(t, store, n2.ID, 3) // Assert that the service finished before the batch and system var serviceMax, batchMax uint64 = 0, 0 @@ -653,7 +653,7 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { // Wait for the allocs to be replaced waitForAllocsStop(t, store, n1.ID, errCh) - waitForPlacedAllocs(t, store, n2.ID, 5) + waitForPlacedAllocs(t, store, n2.ID, 3) // Wait for the node drain to be marked complete with the events we expect waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, drainer.NodeDrainEventDetailDeadlined) diff --git a/nomad/structs/alloc_test.go b/nomad/structs/alloc_test.go index 0a7d6b5050e..d648f7e3c3f 100644 --- a/nomad/structs/alloc_test.go +++ b/nomad/structs/alloc_test.go @@ -4,10 +4,12 @@ package structs import ( + "fmt" "testing" "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/uuid" "github.com/shoenig/test/must" ) @@ -391,3 +393,245 @@ func TestAllocation_Expired_Disconnected(t *testing.T) { }) } } + +func TestAllocation_NextRescheduleTime(t *testing.T) { + now := time.Now() + makeTestAlloc := func(batch bool) *Allocation { + j := &Job{ + Region: "global", + Name: "mock-job", + Namespace: DefaultNamespace, + Priority: 50, + Status: JobStatusPending, + TaskGroups: []*TaskGroup{ + { + Name: "MockTaskGroup", + ReschedulePolicy: &ReschedulePolicy{ + Attempts: 2, + Interval: time.Hour, + Delay: 2 * time.Minute, + DelayFunction: "constant", + MaxDelay: -1, + Unlimited: false, + }, + }, + }, + } + if batch { + j.Type = JobTypeBatch + j.ID = fmt.Sprintf("mock-batch-%s", uuid.Generate()) + } else { + j.Type = JobTypeService + j.ID = fmt.Sprintf("mock-service-%s", uuid.Generate()) + } + j.Canonicalize() + + alloc := &Allocation{ + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: "12345678-abcd-efab-cdef-123456789abc", + Namespace: DefaultNamespace, + TaskGroup: "MockTaskGroup", + Job: j, + DesiredStatus: AllocDesiredStatusRun, + ClientStatus: AllocClientStatusFailed, + TaskStates: map[string]*TaskState{ + "task": {State: TaskStateDead, Failed: true, FinishedAt: now}, + }, + } + alloc.Canonicalize() + + return alloc + } + + testCases := []struct { + name string + allocFn func(*Allocation) + isBatch bool + isEligible bool + }{ + { + name: "client status is failed", + isEligible: true, + }, + { + name: "client status is lost", + allocFn: func(a *Allocation) { a.ClientStatus = AllocClientStatusLost }, + isEligible: true, + }, + { + name: "client status is pending", + allocFn: func(a *Allocation) { a.ClientStatus = AllocClientStatusPending }, + isEligible: false, + }, + { + name: "client status is running", + allocFn: func(a *Allocation) { a.ClientStatus = AllocClientStatusRunning }, + isEligible: false, + }, + { + name: "client status is complete", + allocFn: func(a *Allocation) { a.ClientStatus = AllocClientStatusComplete }, + isEligible: false, + }, + { + name: "client status is unknown", + allocFn: func(a *Allocation) { a.ClientStatus = AllocClientStatusUnknown }, + isEligible: false, + }, + { + name: "failed service without reschedule policy", + allocFn: func(a *Allocation) { + a.Job.TaskGroups[0].ReschedulePolicy = nil + }, + isEligible: false, + }, + { + name: "failed service with policy not unlimited and no attempts", + allocFn: func(a *Allocation) { + a.Job.TaskGroups[0].ReschedulePolicy.Attempts = 0 + }, + isEligible: false, + }, + { + name: "failed service with policy unlimited and no attempts", + allocFn: func(a *Allocation) { + a.Job.TaskGroups[0].ReschedulePolicy.Attempts = 0 + a.Job.TaskGroups[0].ReschedulePolicy.Unlimited = true + }, + isEligible: true, + }, + { + name: "service with desired stop and last reschedule did not fail", + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{LastReschedule: LastRescheduleSuccess} + }, + isEligible: false, + }, + { + name: "service with desired stop and last reschedule is failed without events", + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{LastReschedule: LastRescheduleFailedToPlace} + }, + isEligible: false, + }, + { + name: "service with desired stop and last reschedule is failed with events", + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{}, + } + }, + isEligible: true, + }, + { + name: "service has not exceeded reschedule attempts", + allocFn: func(a *Allocation) { + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{ + {RescheduleTime: now.Add(-10 * time.Minute).UnixNano()}, + }, + } + }, + isEligible: true, + }, + { + name: "service has exceeded reschedule attempts", + allocFn: func(a *Allocation) { + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{ + {RescheduleTime: now.Add(-10 * time.Minute).UnixNano()}, + {RescheduleTime: now.Add(-5 * time.Minute).UnixNano()}, + }, + } + }, + isEligible: false, + }, + { + name: "batch without reschedule", + isBatch: true, + allocFn: func(a *Allocation) { a.Job.TaskGroups[0].ReschedulePolicy = nil }, + isEligible: false, + }, + { + name: "batch with desired stop and last reschedule did not fail", + isBatch: true, + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{LastReschedule: LastRescheduleSuccess} + }, + isEligible: false, + }, + { + name: "batch with desired stop and last reschedule is failed without events", + isBatch: true, + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{LastReschedule: LastRescheduleFailedToPlace} + }, + isEligible: false, + }, + { + name: "batch with desired stop and last reschedule is failed with events", + isBatch: true, + allocFn: func(a *Allocation) { + a.DesiredStatus = AllocDesiredStatusStop + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{}, + } + }, + isEligible: true, + }, + { + name: "batch has not exceeded reschedule attempts", + isBatch: true, + allocFn: func(a *Allocation) { + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{ + {RescheduleTime: now.Add(-10 * time.Minute).UnixNano()}, + }, + } + }, + isEligible: true, + }, + { + name: "batch has exceeded reschedule attempts", + isBatch: true, + allocFn: func(a *Allocation) { + a.RescheduleTracker = &RescheduleTracker{ + LastReschedule: LastRescheduleFailedToPlace, + Events: []*RescheduleEvent{ + {RescheduleTime: now.Add(-10 * time.Minute).UnixNano()}, + {RescheduleTime: now.Add(-5 * time.Minute).UnixNano()}, + }, + } + }, + isEligible: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + alloc := makeTestAlloc(tc.isBatch) + if tc.allocFn != nil { + tc.allocFn(alloc) + } + + nextTime, eligible := alloc.NextRescheduleTime() + if tc.isEligible { + must.True(t, eligible) + must.Eq(t, now.Add(2*time.Minute), nextTime) + } else { + must.False(t, eligible) + } + }) + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 3d30a55fde6..55eb56814b0 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1160,6 +1160,7 @@ type AllocUpdateDesiredTransitionRequest struct { type AllocStopRequest struct { AllocID string NoShutdownDelay bool + Reschedule bool WriteRequest } @@ -10848,6 +10849,11 @@ type DesiredTransition struct { // task shutdown_delay configuration and ignore the delay for any // allocations stopped as a result of this Deregister call. NoShutdownDelay *bool + + // MigrateDisablePlacement is used to disable the placement of the allocation + // when Migrate is set. This field is used to prevent batch job allocations + // from being placed after being stopped. + MigrateDisablePlacement *bool } // Merge merges the two desired transitions, preferring the values from the @@ -10857,6 +10863,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { d.Migrate = o.Migrate } + if o.MigrateDisablePlacement != nil { + d.MigrateDisablePlacement = o.MigrateDisablePlacement + } + if o.Reschedule != nil { d.Reschedule = o.Reschedule } @@ -10872,12 +10882,18 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { // ShouldMigrate returns whether the transition object dictates a migration. func (d *DesiredTransition) ShouldMigrate() bool { + if d == nil { + return false + } return d.Migrate != nil && *d.Migrate } // ShouldReschedule returns whether the transition object dictates a // rescheduling. func (d *DesiredTransition) ShouldReschedule() bool { + if d == nil { + return false + } return d.Reschedule != nil && *d.Reschedule } @@ -10899,6 +10915,15 @@ func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { return d.NoShutdownDelay != nil && *d.NoShutdownDelay } +// ShouldDisableMigrationPlacement returns whether the transition object dictates +// that the migration should place allocation. +func (d *DesiredTransition) ShouldDisableMigrationPlacement() bool { + if d == nil { + return false + } + return d.MigrateDisablePlacement != nil && *d.MigrateDisablePlacement +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop @@ -11328,6 +11353,7 @@ func (a *Allocation) MigrateStrategy() *MigrateStrategy { func (a *Allocation) NextRescheduleTime() (time.Time, bool) { failTime := a.LastEventTime() reschedulePolicy := a.ReschedulePolicy() + isRescheduledBatch := a.Job.Type == JobTypeBatch && a.DesiredTransition.ShouldReschedule() // If reschedule is disabled, return early if reschedulePolicy == nil || (reschedulePolicy.Attempts == 0 && !reschedulePolicy.Unlimited) { @@ -11335,7 +11361,7 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) { } if (a.DesiredStatus == AllocDesiredStatusStop && !a.LastRescheduleFailed()) || - (a.ClientStatus != AllocClientStatusFailed && a.ClientStatus != AllocClientStatusLost) || + (!isRescheduledBatch && a.ClientStatus != AllocClientStatusFailed && a.ClientStatus != AllocClientStatusLost) || failTime.IsZero() || reschedulePolicy == nil { return time.Time{}, false } @@ -11352,6 +11378,7 @@ func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *Re attempted, attempts := a.RescheduleTracker.rescheduleInfo(reschedulePolicy, failTime) rescheduleEligible = attempted < attempts && nextDelay < reschedulePolicy.Interval } + return nextRescheduleTime, rescheduleEligible } @@ -12288,6 +12315,7 @@ const ( EvalTriggerScaling = "job-scaling" EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout" EvalTriggerReconnect = "reconnect" + EvalTriggerAllocReschedule = "alloc-reschedule" ) const ( diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 99a0962bbb8..2d5be83e9f5 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -166,7 +166,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister, structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate, - structs.EvalTriggerAllocStop, + structs.EvalTriggerAllocStop, structs.EvalTriggerAllocReschedule, structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 7e210a5be5b..4750eab5033 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -449,6 +449,8 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { return true } + all = all.filterRescheduledBatchAllocs() + dstate, existingDeployment := a.initializeDeploymentState(groupName, tg) // Filter allocations that do not need to be considered because they are @@ -465,6 +467,13 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment) + // Determine what set of migrating allocations need to be rescheduled. These + // will be batch job allocations that were stopped using the `stop alloc` command. + _, migrateRescheduleNow, migrateRescheduleLater := migrate.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment) + + rescheduleNow = rescheduleNow.union(migrateRescheduleNow) + rescheduleLater = append(rescheduleLater, migrateRescheduleLater...) + // If there are allocations reconnecting we need to reconcile them and // their replacements first because there is specific logic when deciding // which ones to keep that can only be applied when the client reconnects. @@ -526,7 +535,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { if len(lost) > 0 { lostLater = lost.delayByStopAfter() - lostLaterEvals = a.createLostLaterEvals(lostLater, tg.Name) + lostLaterEvals = a.createLaterEvals(lostLater, tg.Name, structs.EvalTriggerRetryFailedAlloc) } // Merge disconnecting with the stop_after_client_disconnect set into the @@ -536,7 +545,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { if len(rescheduleLater) > 0 { // Create batched follow-up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating - a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name) + a.createRescheduleLaterEvals(rescheduleLater, all, migrate, tg.Name) } // Create a structure for choosing names. Seed with the taken names // which is the union of untainted, rescheduled, allocs on migrating @@ -595,7 +604,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // placements can be made without any other consideration. deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying - underProvisionedBy = a.computeReplacements(deploymentPlaceReady, desiredChanges, place, rescheduleNow, lost, underProvisionedBy) + underProvisionedBy = a.computeReplacements(deploymentPlaceReady, desiredChanges, place, migrate, rescheduleNow, lost, underProvisionedBy) if deploymentPlaceReady { a.computeDestructiveUpdates(destructive, underProvisionedBy, desiredChanges, tg) @@ -854,15 +863,17 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, // The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying. // It returns the number of allocs still needed. func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates, - place []allocPlaceResult, rescheduleNow, lost allocSet, underProvisionedBy int) int { + place []allocPlaceResult, migrate, rescheduleNow, lost allocSet, underProvisionedBy int) int { - // Disconnecting allocs are not failing, but are included in rescheduleNow. - // Create a new set that only includes the actual failures and compute - // replacements based off that. + // Disconnecting and migrating allocs are not failing, but may be included + // in rescheduleNow. Create a new set that only includes the actual failures + // and compute replacements based off that. failed := make(allocSet) for id, alloc := range rescheduleNow { - _, ok := a.result.disconnectUpdates[id] - if !ok && alloc.ClientStatus != structs.AllocClientStatusUnknown { + _, isDisconnecting := a.result.disconnectUpdates[id] + _, isMigrating := migrate[id] + + if !isDisconnecting && !isMigrating && alloc.ClientStatus != structs.AllocClientStatusUnknown { failed[id] = alloc } } @@ -950,6 +961,17 @@ func (a *allocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdat alloc: alloc, statusDescription: allocMigrating, }) + + // If this is a batch job allocation, check if the allocation should + // be placed. If the allocation should be rescheduled, the reschedule + // logic will handle placement and it should not be done here (used + // by the `alloc stop` command). If the allocation should disable + // migration placement, then placment should not be done here (used + // when draining batch allocations). + if alloc.Job.Type == structs.JobTypeBatch && (alloc.DesiredTransition.ShouldReschedule() || alloc.DesiredTransition.ShouldDisableMigrationPlacement()) { + continue + } + a.result.place = append(a.result.place, allocPlaceResult{ name: alloc.Name, canary: alloc.DeploymentStatus.IsCanary(), @@ -1283,21 +1305,23 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all // createRescheduleLaterEvals creates batched followup evaluations with the WaitUntil field // set for allocations that are eligible to be rescheduled later, and marks the alloc with // the followupEvalID -func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { +func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all, migrate allocSet, tgName string) { // followupEvals are created in the same way as for delayed lost allocs - allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName) + allocIDToFollowupEvalID := a.createLaterEvals(rescheduleLater, tgName, structs.EvalTriggerAllocReschedule) // Create updates that will be applied to the allocs to mark the FollowupEvalID for _, laterAlloc := range rescheduleLater { - existingAlloc := all[laterAlloc.alloc.ID] - updatedAlloc := existingAlloc.Copy() - updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID] - - // Can't updated an allocation that is disconnected - if _, ok := a.result.disconnectUpdates[laterAlloc.allocID]; !ok { - a.result.attributeUpdates[laterAlloc.allocID] = updatedAlloc - } else { + // Update the allocation if possible + if _, ok := a.result.disconnectUpdates[laterAlloc.allocID]; ok { a.result.disconnectUpdates[laterAlloc.allocID].FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID] + } else if m, ok := migrate[laterAlloc.allocID]; ok { + m.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.allocID] + } else { + // Can't updated an allocation that is disconnected + existingAlloc := all[laterAlloc.alloc.ID] + updatedAlloc := existingAlloc.Copy() + updatedAlloc.FollowupEvalID = allocIDToFollowupEvalID[laterAlloc.alloc.ID] + a.result.attributeUpdates[laterAlloc.allocID] = updatedAlloc } } } @@ -1339,10 +1363,10 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { } } -// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for +// createLaterEvals creates batched followup evaluations with the WaitUntil field set for // lost allocations. followupEvals are appended to a.result as a side effect, we return a // map of alloc IDs to their followupEval IDs. -func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName string) map[string]string { +func (a *allocReconciler) createLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName, triggeredBy string) map[string]string { if len(rescheduleLater) == 0 { return map[string]string{} } @@ -1362,7 +1386,7 @@ func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedResched Namespace: a.job.Namespace, Priority: a.evalPriority, Type: a.job.Type, - TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + TriggeredBy: triggeredBy, JobID: a.job.ID, JobModifyIndex: a.job.ModifyIndex, Status: structs.EvalStatusPending, @@ -1383,7 +1407,7 @@ func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedResched Namespace: a.job.Namespace, Priority: a.evalPriority, Type: a.job.Type, - TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + TriggeredBy: triggeredBy, JobID: a.job.ID, JobModifyIndex: a.job.ModifyIndex, Status: structs.EvalStatusPending, diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 12f5937cff8..2e5b339263d 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -1092,6 +1092,292 @@ func TestReconciler_DrainNode(t *testing.T) { assertPlacementsAreRescheduled(t, 0, r.place) } +// Tests that the reconciler properly handles batch job allocations that +// are flagged as should migrate. This behavior is used by the `job restart` +// command when the `-reschedule` flag is provided. +func TestReconciler_MigrateBatchAllocs(t *testing.T) { + ci.Parallel(t) + + job := mock.BatchJob() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 2, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place)) + // These should not have the reschedule field set + assertPlacementsAreRescheduled(t, 0, r.place) +} + +// Tests that the reconciler properly handles batch job allocations that +// are flagged as should migrate. This behavior is used when draining +// a node. Batch allocations should be stopped, but the allocations should +// not be placed/rescheduled elsewhere. +func TestReconciler_MigrateDisablePlacementBatchAllocs(t *testing.T) { + ci.Parallel(t) + + job := mock.BatchJob() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + allocs[i].DesiredTransition.MigrateDisablePlacement = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + // These should not have the reschedule field set + assertPlacementsAreRescheduled(t, 0, r.place) +} + +// Tests that the reconciler properly handles batch job allocations that +// are flagged as should migrate and should reschedule. This behavior is +// used when stopping a batch allocation using the `alloc stop` command. +// Batch allocations should be stopped and rescheduled based on the +// reschedule block. +func TestReconciler_MigrateRescheduleBatchAllocs(t *testing.T) { + ci.Parallel(t) + + t.Run("unset reschedule", func(t *testing.T) { + job := mock.BatchJob() + + // Disable rescheduling + job.TaskGroups[0].ReschedulePolicy = nil + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate and reschedule + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + allocs[i].DesiredTransition.Reschedule = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + }) + + t.Run("disabled reschedule", func(t *testing.T) { + job := mock.BatchJob() + + // Disable rescheduling + job.TaskGroups[0].ReschedulePolicy.Attempts = 0 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate and reschedule + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + allocs[i].DesiredTransition.Reschedule = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + }) + + t.Run("reschedules now", func(t *testing.T) { + job := mock.BatchJob() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate and reschedule + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + allocs[i].DesiredTransition.Reschedule = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, "", 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 2, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + Place: 2, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + // These should have the reschedule field set + assertPlacementsAreRescheduled(t, 2, r.place) + }) + + t.Run("reschedules later", func(t *testing.T) { + job := mock.BatchJob() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.BatchAlloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + alloc.ModifyTime = time.Now().UnixNano() + allocs = append(allocs, alloc) + } + + // Flag two allocations to migrate and reschedule + for i := 0; i < 2; i++ { + allocs[i].DesiredTransition.Migrate = pointer.Of(true) + allocs[i].DesiredTransition.Reschedule = pointer.Of(true) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, true, job.ID, job, + nil, allocs, nil, uuid.Generate(), 50, true) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + }) +} + // Tests the reconciler properly handles draining nodes with allocations while // scaling up func TestReconciler_DrainNode_ScaleUp(t *testing.T) { diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index c6e8398e2ab..c432f995f08 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -472,6 +472,13 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo // status is failed so that they will be replaced. If they are // complete but not failed, they shouldn't be replaced. if isBatch { + // if the batch job allocation is flagged for being rescheduled, + // which happens when stopped with the `alloc stop` command, the + // allocation should not be untainted nor ignored. + if alloc.DesiredTransition.ShouldReschedule() { + return false, false + } + switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop: if alloc.RanSuccessfully() { @@ -551,6 +558,21 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri return } +// filterRescheduledBatchAllocs returns a new allocSet that removes batch job allocations +// that are server terminal and have been rescheduled. This is used to ensure that the +// batch job allocation counts are correct and allocations are placed as expected when +// using the `alloc stop` command. +func (a allocSet) filterRescheduledBatchAllocs() (remaining allocSet) { + remaining = make(allocSet) + for id, alloc := range a { + if alloc.ServerTerminalStatus() && alloc.Job.Type == structs.JobTypeBatch && alloc.DesiredTransition.ShouldReschedule() { + continue + } + remaining[id] = alloc + } + return +} + // filterByTerminal filters out terminal allocs func filterByTerminal(untainted allocSet) (nonTerminal allocSet) { nonTerminal = make(map[string]*structs.Allocation) diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index f3f2a7a7ea1..7043a84948a 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -1954,6 +1954,41 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) { resNow: allocSet{}, resLater: []*delayedRescheduleInfo{}, }, + { + name: "batch successfully complete with desired transition reschedule", + isDisconnecting: false, + isBatch: true, + all: allocSet{ + "batchCompleteReschedule": { + ID: "batchCompleteReschedule", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + TaskGroup: "rescheduleTG", + DesiredTransition: structs.DesiredTransition{ + Reschedule: pointer.Of(true), + }, + ModifyTime: now.Unix(), + TaskStates: map[string]*structs.TaskState{ + "task": {State: structs.TaskStateDead, Failed: false, FinishedAt: now}}, + }, + }, + untainted: allocSet{}, + resNow: allocSet{ + "batchCompleteReschedule": { + ID: "batchCompleteReschedule", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + TaskGroup: "rescheduleTG", + DesiredTransition: structs.DesiredTransition{ + Reschedule: pointer.Of(true), + }, + ModifyTime: now.Unix(), + TaskStates: map[string]*structs.TaskState{ + "task": {State: structs.TaskStateDead, Failed: false, FinishedAt: now}}, + }, + }, + resLater: []*delayedRescheduleInfo{}, + }, { name: "service disconnecting allocation no reschedule", isDisconnecting: true, @@ -2078,6 +2113,11 @@ func TestAllocSet_filterByRescheduleable(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + if tc.isBatch { + testJob.Type = structs.JobTypeBatch + } else { + testJob.Type = structs.JobTypeService + } untainted, resNow, resLater := tc.all.filterByRescheduleable(tc.isBatch, tc.isDisconnecting, now, "evailID", tc.deployment) must.Eq(t, tc.untainted, untainted, must.Sprintf("with-nodes: untainted"))