Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -303,6 +305,93 @@ func (r *Reconciler) reconcile(ctx context.Context, s *scope) error {
return errors.Errorf("unexpected deployment strategy type: %s", md.Spec.Rollout.Strategy.Type)
}

// createOrUpdateMachineSetsAndSyncMachineDeploymentRevision applies changes identified by the rolloutPlanner to both newMS and oldMSs.
// Note: Both newMS and oldMS include the full intent for the SSA apply call with mandatory labels,
// in place propagated fields, the annotations derived from the MachineDeployment, revision annotations
// and also annotations influencing how to perform scale up/down operations.
// scaleIntents instead are handled separately in the rolloutPlanner and should be applied to MachineSets
// before persisting changes.
// Note: When the newMS has been created by the rollout planner, also wait for the cache to be up to date.
func (r *Reconciler) createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(ctx context.Context, p *rolloutPlanner) error {
log := ctrl.LoggerFrom(ctx)
allMSs := append(p.oldMSs, p.newMS)

for _, ms := range allMSs {
log = log.WithValues("MachineSet", klog.KObj(ms))
ctx = ctrl.LoggerInto(ctx, log)

originalReplicas := ptr.Deref(ms.Spec.Replicas, 0)
if scaleIntent, ok := p.scaleIntents[ms.Name]; ok {
ms.Spec.Replicas = &scaleIntent
}

if ms.GetUID() == "" {
// Create the MachineSet.
if err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, ms); err != nil {
r.recorder.Eventf(p.md, corev1.EventTypeWarning, "FailedCreate", "Failed to create MachineSet %s: %v", klog.KObj(ms), err)
return errors.Wrapf(err, "failed to create new MachineSet %s", klog.KObj(ms))
}
log.Info(fmt.Sprintf("MachineSet created (%s)", p.createReason))
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulCreate", "Created MachineSet %s with %d replicas", klog.KObj(ms), ptr.Deref(ms.Spec.Replicas, 0))

// Keep trying to get the MachineSet. This will force the cache to update and prevent any future reconciliation of
// the MachineDeployment to reconcile with an outdated list of MachineSets which could lead to unwanted creation of
// a duplicate MachineSet.
var pollErrors []error
tmpMS := &clusterv1.MachineSet{}
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ms), tmpMS); err != nil {
// Do not return error here. Continue to poll even if we hit an error
// so that we avoid existing because of transient errors like network flakes.
// Capture all the errors and return the aggregate error if the poll fails eventually.
pollErrors = append(pollErrors, err)
return false, nil
}
return true, nil
}); err != nil {
return errors.Wrapf(kerrors.NewAggregate(pollErrors), "failed to get the MachineSet %s after creation", klog.KObj(ms))
}

// Report back creation timestamp, because legacy scale func uses it to sort machines.
// TODO(in-place): drop this as soon as handling of MD with paused rollouts is moved into rollout planner (see scale in machinedeployment_sync.go).
ms.CreationTimestamp = tmpMS.CreationTimestamp
continue
}

// Update the MachineSet to propagate in-place mutable fields from the MachineDeployment and/or changes applied by the rollout planner.
originalMS, ok := p.originalMS[ms.Name]
if !ok {
return errors.Errorf("failed to update MachineSet %s, original MS is missing", klog.KObj(ms))
}

err := ssa.Patch(ctx, r.Client, machineDeploymentManagerName, ms, ssa.WithCachingProxy{Cache: r.ssaCache, Original: originalMS})
if err != nil {
r.recorder.Eventf(p.md, corev1.EventTypeWarning, "FailedUpdate", "Failed to update MachineSet %s: %v", klog.KObj(ms), err)
return errors.Wrapf(err, "failed to update MachineSet %s", klog.KObj(ms))
}

newReplicas := ptr.Deref(ms.Spec.Replicas, 0)
if newReplicas < originalReplicas {
log.Info(fmt.Sprintf("Scaled down MachineSet %s to %d replicas (-%d)", ms.Name, newReplicas, originalReplicas-newReplicas))
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled down MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
}
if newReplicas > originalReplicas {
log.Info(fmt.Sprintf("Scaled up MachineSet %s to %d replicas (+%d)", ms.Name, newReplicas, newReplicas-originalReplicas))
r.recorder.Eventf(p.md, corev1.EventTypeNormal, "SuccessfulScale", "Scaled up MachineSet %v: %d -> %d", ms.Name, originalReplicas, newReplicas)
}
}

// Surface the revision annotation on the MD level
if p.md.Annotations == nil {
p.md.Annotations = make(map[string]string)
}
if p.md.Annotations[clusterv1.RevisionAnnotation] != p.revision {
p.md.Annotations[clusterv1.RevisionAnnotation] = p.revision
}

return nil
}

func (r *Reconciler) reconcileDelete(ctx context.Context, s *scope) error {
log := ctrl.LoggerFrom(ctx)
if err := r.getAndAdoptMachineSetsForDeployment(ctx, s); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,9 @@ func TestMachineDeploymentReconciler(t *testing.T) {
g.Expect(env.List(ctx, machineSets, msListOpts...)).To(Succeed())
// Verify we still only have 2 MachineSets.
g.Expect(machineSets.Items).To(HaveLen(2))
// Verify that the new MachineSet gets the updated labels.
// Verify that the new and old MachineSet gets the updated labels.
g.Expect(machineSets.Items[0].Spec.Template.Labels).To(HaveKeyWithValue("updated", "true"))
// Verify that the old MachineSet does not get the updated labels.
g.Expect(machineSets.Items[1].Spec.Template.Labels).ShouldNot(HaveKeyWithValue("updated", "true"))
g.Expect(machineSets.Items[1].Spec.Template.Labels).To(HaveKeyWithValue("updated", "true"))
}, timeout).Should(Succeed())

// Update the NodeDrainTimout, NodeDeletionTimeoutSeconds, NodeVolumeDetachTimeoutSeconds of the MachineDeployment,
Expand Down Expand Up @@ -384,10 +383,19 @@ func TestMachineDeploymentReconciler(t *testing.T) {
HaveValue(Equal(duration10s)),
), "NodeVolumeDetachTimeoutSeconds value does not match expected")

// Verify that the old machine set keeps the old values.
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDrainTimeoutSeconds).Should(BeNil())
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDeletionTimeoutSeconds).Should(BeNil())
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeVolumeDetachTimeoutSeconds).Should(BeNil())
// Verify that the old machine set have the new values.
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDrainTimeoutSeconds).Should(And(
Not(BeNil()),
HaveValue(Equal(duration10s)),
), "NodeDrainTimout value does not match expected")
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeDeletionTimeoutSeconds).Should(And(
Not(BeNil()),
HaveValue(Equal(duration10s)),
), "NodeDeletionTimeoutSeconds value does not match expected")
g.Expect(machineSets.Items[1].Spec.Template.Spec.Deletion.NodeVolumeDetachTimeoutSeconds).Should(And(
Not(BeNil()),
HaveValue(Equal(duration10s)),
), "NodeVolumeDetachTimeoutSeconds value does not match expected")
}).Should(Succeed())

// Update the deletion.order of the MachineDeployment,
Expand All @@ -404,8 +412,8 @@ func TestMachineDeploymentReconciler(t *testing.T) {
// Verify the deletion.order value is updated
g.Expect(machineSets.Items[0].Spec.Deletion.Order).Should(Equal(clusterv1.NewestMachineSetDeletionOrder))

// Verify that the old machine set retains its delete policy
g.Expect(machineSets.Items[1].Spec.Deletion.Order).To(Equal(clusterv1.OldestMachineSetDeletionOrder))
// Verify that the old machine set have the new values.
g.Expect(machineSets.Items[1].Spec.Deletion.Order).Should(Equal(clusterv1.NewestMachineSetDeletionOrder))
}).Should(Succeed())

// Verify that all the MachineSets have the expected OwnerRef.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,46 +27,26 @@ import (

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/internal/controllers/machinedeployment/mdutil"
"sigs.k8s.io/cluster-api/util/patch"
)

// rolloutOnDelete reconcile machine sets controlled by a MachineDeployment that is using the OnDelete strategy.
func (r *Reconciler) rolloutOnDelete(ctx context.Context, md *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet, templateExists bool) error {
// TODO(in-place): move create newMS into rolloutPlanner
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, md, msList, true, templateExists)
if err != nil {
planner := newRolloutPlanner()
if err := planner.init(ctx, md, msList, nil, true, templateExists); err != nil {
return err
}

planner := newRolloutPlanner()
planner.md = md
planner.newMS = newMS
planner.oldMSs = oldMSs

if err := planner.planOnDelete(ctx); err != nil {
return err
}

allMSs := append(oldMSs, newMS)

// TODO(in-place): also apply/remove labels to MS should go into rolloutPlanner
if err := r.cleanupDisableMachineCreateAnnotation(ctx, newMS); err != nil {
return err
}
if err := r.addDisableMachineCreateAnnotation(ctx, oldMSs); err != nil {
if err := r.createOrUpdateMachineSetsAndSyncMachineDeploymentRevision(ctx, planner); err != nil {
return err
}

// TODO(in-place): this should be changed as soon as rolloutPlanner support MS creation and adding/removing labels from MS
for _, ms := range allMSs {
scaleIntent := ptr.Deref(ms.Spec.Replicas, 0)
if v, ok := planner.scaleIntents[ms.Name]; ok {
scaleIntent = v
}
if err := r.scaleMachineSet(ctx, ms, scaleIntent, md); err != nil {
return err
}
}
newMS := planner.newMS
oldMSs := planner.oldMSs
allMSs := append(oldMSs, newMS)

if err := r.syncDeploymentStatus(allMSs, newMS, md); err != nil {
return err
Expand Down Expand Up @@ -164,26 +144,3 @@ func (p *rolloutPlanner) reconcileOldMachineSetsOnDelete(ctx context.Context) {
}
}
}

// addDisableMachineCreateAnnotation will add the disable machine create annotation to old MachineSets.
func (r *Reconciler) addDisableMachineCreateAnnotation(ctx context.Context, oldMSs []*clusterv1.MachineSet) error {
for _, oldMS := range oldMSs {
log := ctrl.LoggerFrom(ctx, "MachineSet", klog.KObj(oldMS))
if _, ok := oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation]; !ok {
log.V(4).Info("adding annotation on old MachineSet to disable machine creation")
patchHelper, err := patch.NewHelper(oldMS, r.Client)
if err != nil {
return err
}
if oldMS.Annotations == nil {
oldMS.Annotations = map[string]string{}
}
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
err = patchHelper.Patch(ctx, oldMS)
if err != nil {
return err
}
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ import (
"context"
"fmt"
"math/rand"
"os"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"

Expand Down Expand Up @@ -390,8 +390,7 @@ type onDeleteSequenceTestCase struct {

func Test_OnDeleteSequences(t *testing.T) {
ctx := context.Background()
ctx = ctrl.LoggerInto(ctx, klog.Background())
klog.SetOutput(ginkgo.GinkgoWriter)
ctx = ctrl.LoggerInto(ctx, textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(5), textlogger.Output(os.Stdout))))

tests := []onDeleteSequenceTestCase{
{ // delete 1
Expand Down Expand Up @@ -581,22 +580,25 @@ func runOnDeleteTestCase(ctx context.Context, t *testing.T, tt onDeleteSequenceT

// Running a small subset of MD reconcile (the rollout logic and a bit of setReplicas)
p := newRolloutPlanner()
p.md = current.machineDeployment
p.newMS = current.newMS()
p.oldMSs = current.oldMSs()
p.computeDesiredMS = func(_ context.Context, deployment *clusterv1.MachineDeployment, currentNewMS *clusterv1.MachineSet) (*clusterv1.MachineSet, error) {
desiredNewMS := currentNewMS
if currentNewMS == nil {
// uses a predictable MS name when creating newMS, also add the newMS to current.machineSets
totMS := len(current.machineSets)
desiredNewMS = createMS(fmt.Sprintf("ms%d", totMS+1), deployment.Spec.Template.Spec.FailureDomain, 0)
current.machineSets = append(current.machineSets, desiredNewMS)
}
return desiredNewMS, nil
}

err := p.planOnDelete(ctx)
// init the rollout planner and plan next step for a rollout.
err := p.init(ctx, current.machineDeployment, current.machineSets, current.machines(), true, true)
g.Expect(err).ToNot(HaveOccurred())

// Apply changes.
delete(p.newMS.Annotations, clusterv1.DisableMachineCreateAnnotation)
for _, oldMS := range current.oldMSs() {
if oldMS.Annotations == nil {
oldMS.Annotations = map[string]string{}
}
oldMS.Annotations[clusterv1.DisableMachineCreateAnnotation] = "true"
}
err = p.planOnDelete(ctx)
g.Expect(err).ToNot(HaveOccurred())

// Apply changes.
for _, ms := range current.machineSets {
if scaleIntent, ok := p.scaleIntents[ms.Name]; ok {
ms.Spec.Replicas = ptr.To(scaleIntent)
Expand Down
Loading