Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/node-termination-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func cordonNode(node node.Node, nodeName string, drainEvent *monitor.Interruptio
}

func cordonAndDrainNode(node node.Node, nodeName string, drainEvent *monitor.InterruptionEvent, metrics observability.Metrics, recorder observability.K8sEventRecorder, sqsTerminationDraining bool) error {
err := node.CordonAndDrain(nodeName, drainEvent.Description)
err := node.CordonAndDrain(nodeName, drainEvent.Description, recorder.EventRecorder)
if err != nil {
if errors.IsNotFound(err) {
log.Err(err).Msgf("node '%s' not found in the cluster", nodeName)
Expand Down
33 changes: 32 additions & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -64,6 +65,13 @@ const (
maxTaintValueLength = 63
)

const (
// PodEvictReason is the event reason emitted for Pod evictions during node drain
PodEvictReason = "PodEviction"
// PodEvictMsg is the event message emitted for Pod evictions during node drain
PodEvictMsgFmt = "Pod evicted due to node drain (node %s)"
)

var (
maxRetryDeadline time.Duration = 5 * time.Second
conflictRetryInterval time.Duration = 750 * time.Millisecond
Expand Down Expand Up @@ -95,7 +103,7 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper, uptime up
}

// CordonAndDrain will cordon the node and evict pods based on the config
func (n Node) CordonAndDrain(nodeName string, reason string) error {
func (n Node) CordonAndDrain(nodeName string, reason string, recorder recorderInterface) error {
if n.nthConfig.DryRun {
log.Info().Str("node_name", nodeName).Str("reason", reason).Msg("Node would have been cordoned and drained, but dry-run flag was set.")
return nil
Expand All @@ -114,6 +122,25 @@ func (n Node) CordonAndDrain(nodeName string, reason string) error {
if err != nil {
return err
}
// Emit events for all pods that will be evicted
if recorder != nil {
pods, err := n.fetchAllPods(nodeName)
if err == nil {
for _, pod := range pods.Items {
podRef := &corev1.ObjectReference{
Kind: "Pod",
Name: pod.Name,
Namespace: pod.Namespace,
}
annotations := make(map[string]string)
annotations["node"] = nodeName
for k, v := range pod.GetLabels() {
annotations[k] = v
}
recorder.AnnotatedEventf(podRef, annotations, corev1.EventTypeNormal, PodEvictReason, PodEvictMsgFmt, nodeName)
}
}
}
err = drain.RunNodeDrain(n.drainHelper, node.Name)
if err != nil {
return err
Expand Down Expand Up @@ -800,3 +827,7 @@ func filterPodForDeletion(podName string) func(pod corev1.Pod) drain.PodDeleteSt
return drain.MakePodDeleteStatusOkay()
}
}

type recorderInterface interface {
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{})
}
7 changes: 3 additions & 4 deletions pkg/node/node_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package node

import (
"context"
"io/ioutil"
"os"
"strconv"
"testing"
Expand Down Expand Up @@ -86,7 +85,7 @@ func TestUncordonIfRebootedFileReadError(t *testing.T) {

func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) {
d1 := []byte("350735.47 234388.90")
err := ioutil.WriteFile(testFile, d1, 0644)
err := os.WriteFile(testFile, d1, 0644)
h.Ok(t, err)

client := fake.NewSimpleClientset()
Expand All @@ -110,7 +109,7 @@ func TestUncordonIfRebootedSystemNotRestarted(t *testing.T) {

func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) {
d1 := []byte("0 234388.90")
err := ioutil.WriteFile(testFile, d1, 0644)
err := os.WriteFile(testFile, d1, 0644)
h.Ok(t, err)

client := fake.NewSimpleClientset()
Expand All @@ -134,7 +133,7 @@ func TestUncordonIfRebootedFailureToRemoveLabel(t *testing.T) {

func TestUncordonIfRebootedFailureSuccess(t *testing.T) {
d1 := []byte("0 234388.90")
err := ioutil.WriteFile(testFile, d1, 0644)
err := os.WriteFile(testFile, d1, 0644)
h.Ok(t, err)

client := fake.NewSimpleClientset()
Expand Down
47 changes: 44 additions & 3 deletions pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package node_test
import (
"context"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -28,6 +29,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/kubectl/pkg/drain"
)

Expand Down Expand Up @@ -61,7 +63,11 @@ func TestDryRun(t *testing.T) {
tNode, err := node.New(config.Config{DryRun: true})
h.Ok(t, err)

err = tNode.CordonAndDrain(nodeName, "cordonReason")
fakeRecorder := record.NewFakeRecorder(10)
defer close(fakeRecorder.Events)

err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)

h.Ok(t, err)

err = tNode.Cordon(nodeName, "cordonReason")
Expand Down Expand Up @@ -98,6 +104,7 @@ func TestNewFailure(t *testing.T) {
}

func TestDrainSuccess(t *testing.T) {
isOwnerController := true
client := fake.NewSimpleClientset()
_, err := client.CoreV1().Nodes().Create(
context.Background(),
Expand All @@ -106,14 +113,48 @@ func TestDrainSuccess(t *testing.T) {
},
metav1.CreateOptions{})
h.Ok(t, err)

_, err = client.CoreV1().Pods("default").Create(
context.Background(),
&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "cool-app-pod-",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Name: "cool-app",
Kind: "ReplicaSet",
Controller: &isOwnerController,
},
},
},
Spec: v1.PodSpec{
NodeName: nodeName,
},
},
metav1.CreateOptions{})
h.Ok(t, err)

fakeRecorder := record.NewFakeRecorder(10)

tNode := getNode(t, getDrainHelper(client))
err = tNode.CordonAndDrain(nodeName, "cordonReason")
err = tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
close(fakeRecorder.Events)
h.Ok(t, err)
expectedEventArrived := false
for event := range fakeRecorder.Events {
if strings.Contains(event, "Normal PodEviction Pod evicted due to node drain") {
expectedEventArrived = true
}
}
h.Assert(t, expectedEventArrived, "PodEvicted event was not emitted")
}

func TestDrainCordonNodeFailure(t *testing.T) {
fakeRecorder := record.NewFakeRecorder(10)
defer close(fakeRecorder.Events)
tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset()))
err := tNode.CordonAndDrain(nodeName, "cordonReason")
err := tNode.CordonAndDrain(nodeName, "cordonReason", fakeRecorder)
h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/observability/k8s-events.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func InitK8sEventRecorder(enabled bool, nodeName string, sqsMode bool, nodeMetad
}

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("default")})
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})

return K8sEventRecorder{
annotations: annotations,
Expand Down