From acbc5bde9b7ea0a20200f66533f36ca0eea4cc64 Mon Sep 17 00:00:00 2001 From: Roger Torrentsgeneros Date: Thu, 13 Oct 2022 13:34:49 +0200 Subject: [PATCH 1/3] feat: emit pod events on drain --- cmd/node-termination-handler.go | 2 +- pkg/node/node.go | 33 ++++++++++++++++++++++- pkg/node/node_test.go | 47 ++++++++++++++++++++++++++++++--- pkg/observability/k8s-events.go | 2 +- 4 files changed, 78 insertions(+), 6 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 2e7c81da..b9f1c1d8 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -407,7 +407,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio } func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { - err := node.CordonAndDrain(nodeName, drainEvent.Description) + err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder) if err != nil { if errors.IsNotFound(err) { log.Err(err).Msgf("node '%s' not found in the cluster", nodeName) diff --git a/pkg/node/node.go b/pkg/node/node.go index 789e4716..0004f371 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -64,6 +65,13 @@ const ( maxTaintValueLength = 63 ) +const ( + // PodEvictReason is the event reason emitted for Pod evictions during node drain + PodEvictReason = "PodEviction" + // PodEvictMsg is the event message emitted for Pod evictions during node drain + PodEvictMsg = "Pod evicted due to node drain" +) + var ( maxRetryDeadline time.Duration = 5 * time.Second conflictRetryInterval time.Duration = 750 * time.Millisecond @@ -95,7 +103,7 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper, uptime up } // CordonAndDrain will cordon the node and evict pods based on the config -func (n Node) CordonAndDrain(nodeName string, reason string) error { +func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderInterface) error { if n.nthConfig.DryRun { log.Info().Str("node_name", nodeName).Str("reason", reason).Msg("Node would have been cordoned and drained, but dry-run flag was set.") return nil @@ -114,6 +122,25 @@ func (n Node) CordonAndDrain(nodeName string, reason string) error { if err != nil { return err } + // Emit events for all pods that will be evicted + if recorder != nil { + pods, err := n.fetchAllPods(nodeName) + if err == nil { + for _, pod := range pods.Items { + podRef := &corev1.ObjectReference{ + Kind: "Pod", + Name: pod.Name, + Namespace: pod.Namespace, + } + annotations := make(map[string]string) + annotations["node"] = nodeName + for k, v := range pod.GetLabels() { + annotations[k] = v + } + recorder.AnnotatedEventf(podRef, annotations, "Normal", PodEvictReason, PodEvictMsg) + } + } + } err = drain.RunNodeDrain(n.drainHelper, node.Name) if err != nil { return err @@ -800,3 +827,7 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt return drain.MakePodDeleteStatusOkay() } } + +type recorderInterface interface { + AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) +} diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 3fd723cc..8398e55a 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -16,6 +16,7 @@ package node_test import ( "context" "strconv" + "strings" "testing" "time" @@ -28,6 +29,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" "k8s.io/kubectl/pkg/drain" ) @@ -61,7 +63,10 @@ func TestDryRun(t *testing.T) { tNode, err := node.New(config.Config{DryRun: true}) h.Ok(t, err) - err = tNode.CordonAndDrain(nodeName, "cordonReason") + fakeRecorder := record.NewFakeRecorder(100) + + err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) + h.Ok(t, err) err = tNode.Cordon(nodeName, "cordonReason") @@ -98,6 +103,7 @@ func TestNewFailure(t *testing.T) { } func TestDrainSuccess(t *testing.T) { + controllerBool := true client := fake.NewSimpleClientset() _, err := client.CoreV1().Nodes().Create( context.Background(), @@ -106,14 +112,49 @@ func TestDrainSuccess(t *testing.T) { }, metav1.CreateOptions{}) h.Ok(t, err) + _, err = client.CoreV1().Pods("default").Create( + context.Background(), + &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "cool-app-pod-", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Name: "cool-app", + Kind: "ReplicaSet", + Controller: &controllerBool, + }, + }, + }, + Spec: v1.PodSpec{ + NodeName: nodeName, + }, + }, + metav1.CreateOptions{}) + h.Ok(t, err) + + fakeRecorder := record.NewFakeRecorder(100) + tNode := getNode(t, getDrainHelper(client)) - err = tNode.CordonAndDrain(nodeName, "cordonReason") + err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) + h.Ok(t, err) + close(fakeRecorder.Events) + expectedEventArrived := false + for event := range fakeRecorder.Events { + if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") { + expectedEventArrived = true + } + } + h.Assert(t, expectedEventArrived, "PodEvicted event was not emitted") } func TestDrainCordonNodeFailure(t *testing.T) { + fakeRecorder := record.NewFakeRecorder(100) + tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) - err := tNode.CordonAndDrain(nodeName, "cordonReason") + err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) + h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil) } diff --git a/pkg/observability/k8s-events.go b/pkg/observability/k8s-events.go index 60f8cba9..4a06eeb4 100644 --- a/pkg/observability/k8s-events.go +++ b/pkg/observability/k8s-events.go @@ -113,7 +113,7 @@ func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetad } broadcaster := record.NewBroadcaster() - broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("default")}) + broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")}) return K8sEventRecorder{ annotations: annotations, From f32824558568f3db47618a7bad9dd6c5b09d8fe6 Mon Sep 17 00:00:00 2001 From: Roger Torrentsgeneros Date: Thu, 13 Oct 2022 18:19:26 +0200 Subject: [PATCH 2/3] fix: satisfy PR requested change --- cmd/node-termination-handler.go | 2 +- pkg/node/node.go | 6 +++--- pkg/node/node_internal_test.go | 7 +++---- pkg/node/node_test.go | 18 +++++++++--------- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index b9f1c1d8..c049072e 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -407,7 +407,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio } func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error { - err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder) + err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder) if err != nil { if errors.IsNotFound(err) { log.Err(err).Msgf("node '%s' not found in the cluster", nodeName) diff --git a/pkg/node/node.go b/pkg/node/node.go index 0004f371..a0a53a3d 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -69,7 +69,7 @@ const ( // PodEvictReason is the event reason emitted for Pod evictions during node drain PodEvictReason = "PodEviction" // PodEvictMsg is the event message emitted for Pod evictions during node drain - PodEvictMsg = "Pod evicted due to node drain" + PodEvictMsgFmt = "Pod evicted due to node drain (node %s)" ) var ( @@ -137,7 +137,7 @@ func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderIn for k, v := range pod.GetLabels() { annotations[k] = v } - recorder.AnnotatedEventf(podRef, annotations, "Normal", PodEvictReason, PodEvictMsg) + recorder.AnnotatedEventf(podRef, annotations, corev1.EventTypeNormal, PodEvictReason, PodEvictMsgFmt, nodeName) } } } @@ -829,5 +829,5 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt } type recorderInterface interface { - AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) + AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) } diff --git a/pkg/node/node_internal_test.go b/pkg/node/node_internal_test.go index 3238458d..d844a9d4 100644 --- a/pkg/node/node_internal_test.go +++ b/pkg/node/node_internal_test.go @@ -15,7 +15,6 @@ package node import ( "context" - "io/ioutil" "os" "strconv" "testing" @@ -86,7 +85,7 @@ func TestUncordonIfRebootedFileReadError(t *testing.T) { func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) { d1 := []byte("350735.47 234388.90") - err := ioutil.WriteFile(testFile, d1, 0644) + err := os.WriteFile(testFile, d1, 0644) h.Ok(t, err) client := fake.NewSimpleClientset() @@ -110,7 +109,7 @@ func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) { func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) { d1 := []byte("0 234388.90") - err := ioutil.WriteFile(testFile, d1, 0644) + err := os.WriteFile(testFile, d1, 0644) h.Ok(t, err) client := fake.NewSimpleClientset() @@ -134,7 +133,7 @@ func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) { func TestUncordonIfRebootedFailureSuccess(t *testing.T) { d1 := []byte("0 234388.90") - err := ioutil.WriteFile(testFile, d1, 0644) + err := os.WriteFile(testFile, d1, 0644) h.Ok(t, err) client := fake.NewSimpleClientset() diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 8398e55a..52805459 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -63,7 +63,8 @@ func TestDryRun(t *testing.T) { tNode, err := node.New(config.Config{DryRun: true}) h.Ok(t, err) - fakeRecorder := record.NewFakeRecorder(100) + fakeRecorder := record.NewFakeRecorder(10) + defer close(fakeRecorder.Events) err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) @@ -103,7 +104,7 @@ func TestNewFailure(t *testing.T) { } func TestDrainSuccess(t *testing.T) { - controllerBool := true + isOwnerController := true client := fake.NewSimpleClientset() _, err := client.CoreV1().Nodes().Create( context.Background(), @@ -112,6 +113,7 @@ func TestDrainSuccess(t *testing.T) { }, metav1.CreateOptions{}) h.Ok(t, err) + _, err = client.CoreV1().Pods("default").Create( context.Background(), &v1.Pod{ @@ -122,7 +124,7 @@ func TestDrainSuccess(t *testing.T) { APIVersion: "apps/v1", Name: "cool-app", Kind: "ReplicaSet", - Controller: &controllerBool, + Controller: &isOwnerController, }, }, }, @@ -133,13 +135,12 @@ func TestDrainSuccess(t *testing.T) { metav1.CreateOptions{}) h.Ok(t, err) - fakeRecorder := record.NewFakeRecorder(100) + fakeRecorder := record.NewFakeRecorder(10) tNode := getNode(t, getDrainHelper(client)) err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) - - h.Ok(t, err) close(fakeRecorder.Events) + h.Ok(t, err) expectedEventArrived := false for event := range fakeRecorder.Events { if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") { @@ -150,11 +151,10 @@ func TestDrainSuccess(t *testing.T) { } func TestDrainCordonNodeFailure(t *testing.T) { - fakeRecorder := record.NewFakeRecorder(100) - + fakeRecorder := record.NewFakeRecorder(10) + defer close(fakeRecorder.Events) tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) - h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil) } From 146f142d83f27e1b52628d3b83ada4b81455e064 Mon Sep 17 00:00:00 2001 From: Roger Torrentsgeneros Date: Thu, 20 Oct 2022 21:38:46 +0200 Subject: [PATCH 3/3] chore: use recorderBufferSize const --- pkg/node/node_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index 52805459..13383a63 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -33,6 +33,9 @@ import ( "k8s.io/kubectl/pkg/drain" ) +// Size of the fakeRecorder buffer +const recorderBufferSize = 10 + var nodeName = "NAME" func getDrainHelper(client *fake.Clientset) *drain.Helper { @@ -63,7 +66,7 @@ func TestDryRun(t *testing.T) { tNode, err := node.New(config.Config{DryRun: true}) h.Ok(t, err) - fakeRecorder := record.NewFakeRecorder(10) + fakeRecorder := record.NewFakeRecorder(recorderBufferSize) defer close(fakeRecorder.Events) err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) @@ -135,7 +138,7 @@ func TestDrainSuccess(t *testing.T) { metav1.CreateOptions{}) h.Ok(t, err) - fakeRecorder := record.NewFakeRecorder(10) + fakeRecorder := record.NewFakeRecorder(recorderBufferSize) tNode := getNode(t, getDrainHelper(client)) err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder) @@ -151,7 +154,7 @@ func TestDrainSuccess(t *testing.T) { } func TestDrainCordonNodeFailure(t *testing.T) { - fakeRecorder := record.NewFakeRecorder(10) + fakeRecorder := record.NewFakeRecorder(recorderBufferSize) defer close(fakeRecorder.Events) tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)