Skip to content

Commit 1749d5a

Browse files
committed
treat component notification as critical during migration process
1 parent bc96cd8 commit 1749d5a

File tree

4 files changed

+208
-81
lines changed

4 files changed

+208
-81
lines changed

internal/pkg/agent/application/actions/handlers/handler_action_migrate.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ const ()
2727
type migrateCoordinator interface {
2828
actionCoordinator
2929

30-
Migrate(_ context.Context, _ *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error
30+
Migrate(_ context.Context, _ *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff, _ func(context.Context, *fleetapi.ActionMigrate) error) error
3131
ReExec(callback reexec.ShutdownCallbackFn, argOverrides ...string)
3232
Protection() protection.Config
3333
}
@@ -85,7 +85,7 @@ func (h *Migrate) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
8585
}
8686
}
8787

88-
if err := h.coord.Migrate(ctx, action, fleetgateway.RequestBackoff); err != nil {
88+
if err := h.coord.Migrate(ctx, action, fleetgateway.RequestBackoff, h.notifyComponents); err != nil {
8989
// this should not happen, unmanaged agent should not receive the action
9090
// defensive coding to avoid misbehavior
9191
if errors.Is(err, coordinator.ErrNotManaged) {
@@ -102,12 +102,6 @@ func (h *Migrate) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
102102
return fmt.Errorf("migration of agent to a new cluster failed: %w", err)
103103
}
104104

105-
// action is all rigth we can notify endpoint
106-
if err := h.notifyComponents(ctx, action); err != nil {
107-
// config is cleaned up already we cannot revert
108-
h.log.Warnf("failed to notify components, aborting: %v", err)
109-
}
110-
111105
// reexec and load new config
112106
h.coord.ReExec(nil)
113107
return nil

internal/pkg/agent/application/actions/handlers/handler_action_migrate_test.go

Lines changed: 10 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@ import (
1717
"github.com/stretchr/testify/mock"
1818
"github.com/stretchr/testify/require"
1919

20-
"github.com/elastic/elastic-agent-client/v7/pkg/client"
21-
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
2220
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
2321
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
2422
"github.com/elastic/elastic-agent/internal/pkg/agent/protection"
2523
"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
2624
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
2725
"github.com/elastic/elastic-agent/pkg/component"
28-
"github.com/elastic/elastic-agent/pkg/component/runtime"
2926
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
3027
mockinfo "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/agent/application/info"
3128
)
@@ -42,7 +39,7 @@ func TestActionMigratelHandler(t *testing.T) {
4239
ack.On("Commit", t.Context()).Return(nil)
4340

4441
coord := &fakeMigrateCoordinator{}
45-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
42+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
4643
coord.On("ReExec", mock.Anything, mock.Anything)
4744
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
4845

@@ -82,7 +79,7 @@ func TestActionMigratelHandler(t *testing.T) {
8279

8380
coord := &fakeMigrateCoordinator{}
8481
coord.On("State").Return(coordinator.State{})
85-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
82+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
8683
coord.On("ReExec", mock.Anything, mock.Anything)
8784
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil, Enabled: tc.protectionEnabled})
8885

@@ -120,7 +117,7 @@ func TestActionMigratelHandler(t *testing.T) {
120117

121118
coord := &fakeMigrateCoordinator{}
122119
coord.On("State").Return(coordinator.State{})
123-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
120+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
124121
coord.On("ReExec", mock.Anything, mock.Anything)
125122
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
126123

@@ -170,7 +167,7 @@ func TestActionMigratelHandler(t *testing.T) {
170167

171168
coord := &fakeMigrateCoordinator{}
172169
coord.On("State").Return(coordinator.State{})
173-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
170+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
174171
coord.On("ReExec", mock.Anything, mock.Anything)
175172
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
176173

@@ -206,7 +203,7 @@ func TestActionMigratelHandler(t *testing.T) {
206203
ack.On("Commit", t.Context()).Return(nil)
207204

208205
coord := &fakeMigrateCoordinator{}
209-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
206+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
210207
coord.On("ReExec", mock.Anything, mock.Anything)
211208
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
212209

@@ -256,7 +253,7 @@ func TestActionMigratelHandler(t *testing.T) {
256253

257254
coord := &fakeMigrateCoordinator{}
258255
coord.On("State").Return(coordinator.State{})
259-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
256+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
260257
coord.On("ReExec", mock.Anything, mock.Anything)
261258
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
262259

@@ -308,7 +305,7 @@ func TestActionMigratelHandler(t *testing.T) {
308305
ack.On("Commit", t.Context()).Return(nil)
309306

310307
coord := &fakeMigrateCoordinator{}
311-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
308+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
312309
coord.On("ReExec", mock.Anything, mock.Anything)
313310
coord.On("Protection").Return(protection.Config{SignatureValidationKey: signatureValidationKey})
314311

@@ -331,7 +328,7 @@ func TestActionMigratelHandler(t *testing.T) {
331328

332329
coord := &fakeMigrateCoordinator{}
333330
coord.On("State").Return(coordinator.State{})
334-
coord.On("Migrate", mock.Anything, mock.Anything).Return(coordinator.ErrFleetServer)
331+
coord.On("Migrate", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(coordinator.ErrFleetServer)
335332
coord.On("ReExec", mock.Anything, mock.Anything)
336333
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
337334

@@ -346,65 +343,14 @@ func TestActionMigratelHandler(t *testing.T) {
346343
ack.AssertNumberOfCalls(t, "Commit", 1)
347344
coord.AssertNotCalled(t, "ReExec", mock.Anything, mock.Anything)
348345
})
349-
350-
t.Run("endpoint notified", func(t *testing.T) {
351-
mockAgentInfo := mockinfo.NewAgent(t)
352-
mockAgentInfo.On("AgentID").Return("agent-id")
353-
action := &fleetapi.ActionMigrate{
354-
ActionType: fleetapi.ActionTypeMigrate,
355-
}
356-
357-
ack := &fakeAcker{}
358-
ack.On("Ack", t.Context(), action).Return(nil)
359-
ack.On("Commit", t.Context()).Return(nil)
360-
361-
coord := &fakeMigrateCoordinator{}
362-
coord.On("State").Return(coordinator.State{
363-
Components: []runtime.ComponentComponentState{
364-
runtime.ComponentComponentState{
365-
Component: component.Component{
366-
InputSpec: &component.InputRuntimeSpec{
367-
Spec: component.InputSpec{
368-
ProxiedActions: []string{fleetapi.ActionTypeMigrate},
369-
},
370-
},
371-
Units: []component.Unit{
372-
component.Unit{
373-
Type: client.UnitTypeInput,
374-
Config: &proto.UnitExpectedConfig{
375-
Type: "migrate-sensitive-input",
376-
},
377-
},
378-
},
379-
InputType: "migrate-sensitive-input",
380-
},
381-
},
382-
},
383-
})
384-
coord.On("PerformAction", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(map[string]interface{}{}, nil)
385-
coord.On("Migrate", mock.Anything, mock.Anything).Return(nil)
386-
coord.On("ReExec", mock.Anything, mock.Anything)
387-
coord.On("Protection").Return(protection.Config{SignatureValidationKey: nil})
388-
389-
h := NewMigrate(log, mockAgentInfo, coord)
390-
h.tamperProtectionFn = func() bool { return false }
391-
392-
require.NoError(t, h.Handle(t.Context(), action, ack))
393-
coord.AssertNumberOfCalls(t, "Migrate", 1)
394-
395-
// ack not delegated to migrate coordinator, failure is reported
396-
ack.AssertNumberOfCalls(t, "Ack", 0)
397-
ack.AssertNumberOfCalls(t, "Commit", 0)
398-
coord.AssertNumberOfCalls(t, "ReExec", 1)
399-
})
400346
}
401347

402348
type fakeMigrateCoordinator struct {
403349
mock.Mock
404350
}
405351

406-
func (f *fakeMigrateCoordinator) Migrate(ctx context.Context, a *fleetapi.ActionMigrate, _ func(done <-chan struct{}) backoff.Backoff) error {
407-
args := f.Called(ctx, a)
352+
func (f *fakeMigrateCoordinator) Migrate(ctx context.Context, a *fleetapi.ActionMigrate, b func(done <-chan struct{}) backoff.Backoff, n func(context.Context, *fleetapi.ActionMigrate) error) error {
353+
args := f.Called(ctx, a, b, n)
408354
return args.Error(0)
409355
}
410356

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"reflect"
1212
"strings"
13+
"sync"
1314
"sync/atomic"
1415
"time"
1516

@@ -364,7 +365,7 @@ type Coordinator struct {
364365
// migrationProgressWg is used to block processing of incoming policies after enroll is done
365366
// incomming policies are blocked until we reboot so components receiving proxied MIGRATE action
366367
// are not confused
367-
isMigrationProgress bool
368+
migrationProgressWg sync.WaitGroup
368369
}
369370

370371
// The channels Coordinator reads to receive updates from the various managers.
@@ -600,7 +601,12 @@ func (c *Coordinator) ReExec(callback reexec.ShutdownCallbackFn, argOverrides ..
600601

601602
// Migrate migrates agent to a new cluster and ACKs success to the old one.
602603
// In case of failure no ack is performed and error is returned.
603-
func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrate, backoffFactory func(done <-chan struct{}) backoff.Backoff) error {
604+
func (c *Coordinator) Migrate(
605+
ctx context.Context,
606+
action *fleetapi.ActionMigrate,
607+
backoffFactory func(done <-chan struct{}) backoff.Backoff,
608+
notifyFn func(context.Context, *fleetapi.ActionMigrate) error,
609+
) error {
604610
if !c.isManaged {
605611
return ErrNotManaged
606612
}
@@ -670,6 +676,24 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat
670676
return errors.Join(fmt.Errorf("failed to enroll: %w", err), restoreErr)
671677
}
672678

679+
// lock processing of new config before notifying components
680+
// hold lock until notification failure or reexec
681+
c.migrationProgressWg.Add(1)
682+
if notifyFn != nil {
683+
// notify before completing migration
684+
// components such endpoint are crucial to work even though it's on stale cluster
685+
// error on component side is returned as part of Action response
686+
if err := notifyFn(ctx, action); err != nil {
687+
restoreErr := RestoreConfig()
688+
689+
// in case of failure no need to lock processing
690+
// safe to forward policy from source cluster
691+
c.migrationProgressWg.Done()
692+
693+
return errors.Join(fmt.Errorf("failed to notify components: %w", err), restoreErr)
694+
}
695+
}
696+
673697
// ACK success to source fleet server
674698
if err := c.ackMigration(ctx, action, c.fleetAcker); err != nil {
675699
c.logger.Warnf("failed to ACK success: %v", err)
@@ -682,7 +706,6 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat
682706
}
683707

684708
c.bestEffortUnenroll(ctx, originalOptions)
685-
c.isMigrationProgress = true
686709

687710
return nil
688711
}
@@ -1535,9 +1558,7 @@ func (c *Coordinator) runLoopIteration(ctx context.Context) {
15351558

15361559
// Always called on the main Coordinator goroutine.
15371560
func (c *Coordinator) processConfig(ctx context.Context, cfg *config.Config) (err error) {
1538-
if c.isMigrationProgress {
1539-
return nil
1540-
}
1561+
c.migrationProgressWg.Wait()
15411562

15421563
if c.otelMgr != nil {
15431564
c.otelCfg = cfg.OTel

0 commit comments

Comments
 (0)