diff --git a/cmd/node-termination-handler.go b/cmd/node-termination-handler.go index 0851088f..ad7f757a 100644 --- a/cmd/node-termination-handler.go +++ b/cmd/node-termination-handler.go @@ -22,9 +22,9 @@ import ( "time" "github.com/aws/aws-node-termination-handler/pkg/config" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" - "github.com/aws/aws-node-termination-handler/pkg/draineventstore" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" + "github.com/aws/aws-node-termination-handler/pkg/interruptioneventstore" "github.com/aws/aws-node-termination-handler/pkg/node" "github.com/aws/aws-node-termination-handler/pkg/webhook" ) @@ -34,7 +34,7 @@ const ( spotITN = "Spot ITN" ) -type monitorFunc func(chan<- drainevent.DrainEvent, chan<- drainevent.DrainEvent, *ec2metadata.Service) error +type monitorFunc func(chan<- interruptionevent.InterruptionEvent, chan<- interruptionevent.InterruptionEvent, *ec2metadata.Service) error func main() { signalChan := make(chan os.Signal, 1) @@ -57,34 +57,34 @@ func main() { imds := ec2metadata.New(nthConfig.MetadataURL, nthConfig.MetadataTries) - drainEventStore := draineventstore.New(nthConfig) + interruptionEventStore := interruptioneventstore.New(nthConfig) nodeMetadata := imds.GetNodeMetadata() if nthConfig.EnableScheduledEventDraining { - err = handleRebootUncordon(drainEventStore, *node) + err = handleRebootUncordon(interruptionEventStore, *node) if err != nil { log.Printf("Unable to complete the uncordon after reboot workflow on startup: %v\n", err) } } - drainChan := make(chan drainevent.DrainEvent) - defer close(drainChan) - cancelChan := make(chan drainevent.DrainEvent) + interruptionChan := make(chan interruptionevent.InterruptionEvent) + defer close(interruptionChan) + cancelChan := make(chan interruptionevent.InterruptionEvent) defer close(cancelChan) monitoringFns := map[string]monitorFunc{} if nthConfig.EnableSpotInterruptionDraining { - monitoringFns[spotITN] = drainevent.MonitorForSpotITNEvents + monitoringFns[spotITN] = interruptionevent.MonitorForSpotITNEvents } if nthConfig.EnableScheduledEventDraining { - monitoringFns[scheduledMaintenance] = drainevent.MonitorForScheduledEvents + monitoringFns[scheduledMaintenance] = interruptionevent.MonitorForScheduledEvents } for eventType, fn := range monitoringFns { go func(monitorFn monitorFunc, eventType string) { log.Printf("Started monitoring for %s events", eventType) for range time.Tick(time.Second * 2) { - err := monitorFn(drainChan, cancelChan, imds) + err := monitorFn(interruptionChan, cancelChan, imds) if err != nil { log.Printf("There was a problem monitoring for %s events: %v", eventType, err) } @@ -92,26 +92,26 @@ func main() { }(fn, eventType) } - go watchForDrainEvents(drainChan, drainEventStore, nodeMetadata) - log.Println("Started watching for drain events") + go watchForInterruptionEvents(interruptionChan, interruptionEventStore, nodeMetadata) + log.Println("Started watching for interruption events") log.Println("Kubernetes AWS Node Termination Handler has started successfully!") - go watchForCancellationEvents(cancelChan, drainEventStore, node, nodeMetadata) + go watchForCancellationEvents(cancelChan, interruptionEventStore, node, nodeMetadata) log.Println("Started watching for event cancellations") for range time.NewTicker(1 * time.Second).C { select { case _ = <-signalChan: - // Exit drain loop if a SIGTERM is received or the channel is closed + // Exit interruption loop if a SIGTERM is received or the channel is closed break default: - drainIfNecessary(drainEventStore, *node, nthConfig, nodeMetadata) + drainOrCordonIfNecessary(interruptionEventStore, *node, nthConfig, nodeMetadata) } } log.Println("AWS Node Termination Handler is shutting down") } -func handleRebootUncordon(drainEventStore *draineventstore.Store, node node.Node) error { +func handleRebootUncordon(interruptionEventStore *interruptioneventstore.Store, node node.Node) error { isLabeled, err := node.IsLabeledWithAction() if err != nil { return err @@ -127,24 +127,24 @@ func handleRebootUncordon(drainEventStore *draineventstore.Store, node node.Node if err != nil { return fmt.Errorf("Unable to complete node label actions: %w", err) } - drainEventStore.IgnoreEvent(eventID) + interruptionEventStore.IgnoreEvent(eventID) return nil } -func watchForDrainEvents(drainChan <-chan drainevent.DrainEvent, drainEventStore *draineventstore.Store, nodeMetadata ec2metadata.NodeMetadata) { +func watchForInterruptionEvents(interruptionChan <-chan interruptionevent.InterruptionEvent, interruptionEventStore *interruptioneventstore.Store, nodeMetadata ec2metadata.NodeMetadata) { for { - drainEvent := <-drainChan - log.Printf("Got drain event from channel %+v %+v\n", nodeMetadata, drainEvent) - drainEventStore.AddDrainEvent(&drainEvent) + interruptionEvent := <-interruptionChan + log.Printf("Got interruption event from channel %+v %+v\n", nodeMetadata, interruptionEvent) + interruptionEventStore.AddInterruptionEvent(&interruptionEvent) } } -func watchForCancellationEvents(cancelChan <-chan drainevent.DrainEvent, drainEventStore *draineventstore.Store, node *node.Node, nodeMetadata ec2metadata.NodeMetadata) { +func watchForCancellationEvents(cancelChan <-chan interruptionevent.InterruptionEvent, interruptionEventStore *interruptioneventstore.Store, node *node.Node, nodeMetadata ec2metadata.NodeMetadata) { for { - drainEvent := <-cancelChan - log.Printf("Got cancel event from channel %+v %+v\n", nodeMetadata, drainEvent) - drainEventStore.CancelDrainEvent(drainEvent.EventID) - if drainEventStore.ShouldUncordonNode() { + interruptionEvent := <-cancelChan + log.Printf("Got cancel event from channel %+v %+v\n", nodeMetadata, interruptionEvent) + interruptionEventStore.CancelInterruptionEvent(interruptionEvent.EventID) + if interruptionEventStore.ShouldUncordonNode() { log.Println("Uncordoning the node due to a cancellation event") err := node.Uncordon() if err != nil { @@ -152,26 +152,35 @@ func watchForCancellationEvents(cancelChan <-chan drainevent.DrainEvent, drainEv } node.RemoveNTHLabels() } else { - log.Println("Another drain event is active, not uncordoning the node") + log.Println("Another interruption event is active, not uncordoning the node") } } } -func drainIfNecessary(drainEventStore *draineventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata) { - if drainEvent, ok := drainEventStore.GetActiveEvent(); ok { +func drainOrCordonIfNecessary(interruptionEventStore *interruptioneventstore.Store, node node.Node, nthConfig config.Config, nodeMetadata ec2metadata.NodeMetadata) { + if drainEvent, ok := interruptionEventStore.GetActiveEvent(); ok { if drainEvent.PreDrainTask != nil { err := drainEvent.PreDrainTask(*drainEvent, node) if err != nil { log.Println("There was a problem executing the pre-drain task: ", err) } } - err := node.Drain() - if err != nil { - log.Println("There was a problem while trying to drain the node: ", err) - os.Exit(1) + if nthConfig.CordonOnly { + err := node.Cordon() + if err != nil { + log.Println("There was a problem while trying to cordon the node: ", err) + os.Exit(1) + } + log.Printf("Node %q successfully cordoned.\n", nthConfig.NodeName) + } else { + err := node.CordonAndDrain() + if err != nil { + log.Println("There was a problem while trying to cordon and drain the node: ", err) + os.Exit(1) + } + log.Printf("Node %q successfully cordoned and drained.\n", nthConfig.NodeName) } - drainEventStore.MarkAllAsDrained() - log.Printf("Node %q successfully drained.\n", nthConfig.NodeName) + interruptionEventStore.MarkAllAsDrained() if nthConfig.WebhookURL != "" { webhook.Post(nodeMetadata, drainEvent, nthConfig) } diff --git a/config/helm/aws-node-termination-handler/Chart.yaml b/config/helm/aws-node-termination-handler/Chart.yaml index 488d046c..dff553cb 100644 --- a/config/helm/aws-node-termination-handler/Chart.yaml +++ b/config/helm/aws-node-termination-handler/Chart.yaml @@ -1,7 +1,7 @@ apiVersion: v1 name: aws-node-termination-handler description: A Helm chart for the AWS Node Termination Handler -version: 0.7.4 +version: 0.7.5 appVersion: 1.3.1 home: https://github.com/aws/eks-charts icon: https://raw.githubusercontent.com/aws/eks-charts/master/docs/logo/aws.png diff --git a/config/helm/aws-node-termination-handler/README.md b/config/helm/aws-node-termination-handler/README.md index 8967077a..7c0e4eb3 100644 --- a/config/helm/aws-node-termination-handler/README.md +++ b/config/helm/aws-node-termination-handler/README.md @@ -62,6 +62,11 @@ Parameter | Description | Default `webhookURL` | Posts event data to URL upon instance interruption action | `` `webhookHeaders` | Replaces the default webhook headers. | `{"Content-type":"application/json"}` `webhookTemplate` | Replaces the default webhook message template. | `{"text":"[NTH][Instance Interruption] EventID: {{ .EventID }} - Kind: {{ .Kind }} - Description: {{ .Description }} - State: {{ .State }} - Start Time: {{ .StartTime }}"}` +`dryRun` | If true, only log if a node would be drained | `false` +`enableScheduledEventDraining` | [EXPERIMENTAL] If true, drain nodes before the maintenance window starts for an EC2 instance scheduled event | `false` +`enableSpotInterruptionDraining` | If true, drain nodes when the spot interruption termination notice is received | `true` +`metadataTries` | The number of times to try requesting metadata. If you would like 2 retries, set metadata-tries to 3. | `3` +`cordonOnly` | If true, nodes will be cordoned but not drained when an interruption event occurs. | `false` `affinity` | node/pod affinities | None `podAnnotations` | annotations to add to each pod | `{}` `priorityClassName` | Name of the priorityClass | `system-node-critical` diff --git a/config/helm/aws-node-termination-handler/templates/daemonset.yaml b/config/helm/aws-node-termination-handler/templates/daemonset.yaml index 8eaf17d2..7e08b7d7 100644 --- a/config/helm/aws-node-termination-handler/templates/daemonset.yaml +++ b/config/helm/aws-node-termination-handler/templates/daemonset.yaml @@ -105,6 +105,10 @@ spec: value: {{ .Values.enableSpotInterruptionDraining | quote }} - name: ENABLE_SCHEDULED_EVENT_DRAINING value: {{ .Values.enableScheduledEventDraining | quote }} + - name: METADATA_TRIES + value: {{ .Values.metadataTries | quote }} + - name: CORDON_ONLY + value: {{ .Values.cordonOnly | quote }} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.nodeSelector }} diff --git a/pkg/config/config.go b/pkg/config/config.go index 415ae3e8..88f4261d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -48,6 +48,7 @@ const ( enableSpotInterruptionDrainingDefault = true metadataTriesConfigKey = "METADATA_TRIES" metadataTriesDefault = 3 + cordonOnly = "CORDON_ONLY" ) //Config arguments set via CLI, environment variables, or defaults @@ -67,6 +68,7 @@ type Config struct { EnableScheduledEventDraining bool EnableSpotInterruptionDraining bool MetadataTries int + CordonOnly bool } //ParseCliArgs parses cli arguments and uses environment variables as fallback values @@ -94,8 +96,9 @@ func ParseCliArgs() (config Config, err error) { flag.StringVar(&config.WebhookHeaders, "webhook-headers", getEnv(webhookHeadersConfigKey, webhookHeadersDefault), "If specified, replaces the default webhook headers.") flag.StringVar(&config.WebhookTemplate, "webhook-template", getEnv(webhookTemplateConfigKey, webhookTemplateDefault), "If specified, replaces the default webhook message template.") flag.BoolVar(&config.EnableScheduledEventDraining, "enable-scheduled-event-draining", getBoolEnv(enableScheduledEventDrainingConfigKey, enableScheduledEventDrainingDefault), "[EXPERIMENTAL] If true, drain nodes before the maintenance window starts for an EC2 instance scheduled event") - flag.BoolVar(&config.EnableSpotInterruptionDraining, "enable-spot-interruption-draining", getBoolEnv(enableSpotInterruptionDrainingConfigKey, enableSpotInterruptionDrainingDefault), "If true, drain nodes when the spot interruption termination notice is receieved") + flag.BoolVar(&config.EnableSpotInterruptionDraining, "enable-spot-interruption-draining", getBoolEnv(enableSpotInterruptionDrainingConfigKey, enableSpotInterruptionDrainingDefault), "If true, drain nodes when the spot interruption termination notice is received") flag.IntVar(&config.MetadataTries, "metadata-tries", getIntEnv(metadataTriesConfigKey, metadataTriesDefault), "The number of times to try requesting metadata. If you would like 2 retries, set metadata-tries to 3.") + flag.BoolVar(&config.CordonOnly, "cordon-only", getBoolEnv(cordonOnly, false), "If true, nodes will be cordoned but not drained when an interruption event occurs.") flag.Parse() @@ -128,7 +131,8 @@ func ParseCliArgs() (config Config, err error) { "\tnode-termination-grace-period: %d,\n"+ "\tenable-scheduled-event-draining: %t,\n"+ "\tenable-spot-interruption-draining: %t,\n"+ - "\tmetadata-tries: %d,\n", + "\tmetadata-tries: %d,\n"+ + "\tcordon-only: %t,\n", config.DryRun, config.NodeName, @@ -142,6 +146,7 @@ func ParseCliArgs() (config Config, err error) { config.EnableScheduledEventDraining, config.EnableSpotInterruptionDraining, config.MetadataTries, + config.CordonOnly, ) return config, err diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 82cc430c..445f7e92 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -45,6 +45,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { os.Setenv("WEBHOOK_HEADERS", "WEBHOOK_HEADERS") os.Setenv("WEBHOOK_TEMPLATE", "WEBHOOK_TEMPLATE") os.Setenv("METADATA_TRIES", "100") + os.Setenv("CORDON_ONLY", "false") nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -64,6 +65,7 @@ func TestParseCliArgsEnvSuccess(t *testing.T) { h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders) h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate) h.Equals(t, 100, nthConfig.MetadataTries) + h.Equals(t, false, nthConfig.CordonOnly) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -94,6 +96,7 @@ func TestParseCliArgsSuccess(t *testing.T) { "--webhook-headers=WEBHOOK_HEADERS", "--webhook-template=WEBHOOK_TEMPLATE", "--metadata-tries=100", + "--cordon-only=false", } nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -114,6 +117,7 @@ func TestParseCliArgsSuccess(t *testing.T) { h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders) h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate) h.Equals(t, 100, nthConfig.MetadataTries) + h.Equals(t, false, nthConfig.CordonOnly) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") @@ -139,6 +143,7 @@ func TestParseCliArgsOverrides(t *testing.T) { os.Setenv("WEBHOOK_HEADERS", "no") os.Setenv("WEBHOOK_TEMPLATE", "no") os.Setenv("METADATA_TRIES", "100") + os.Setenv("CORDON_ONLY", "true") os.Args = []string{ "cmd", "--delete-local-data=false", @@ -156,6 +161,7 @@ func TestParseCliArgsOverrides(t *testing.T) { "--webhook-headers=WEBHOOK_HEADERS", "--webhook-template=WEBHOOK_TEMPLATE", "--metadata-tries=101", + "--cordon-only=false", } nthConfig, err := config.ParseCliArgs() h.Ok(t, err) @@ -176,6 +182,7 @@ func TestParseCliArgsOverrides(t *testing.T) { h.Equals(t, "WEBHOOK_HEADERS", nthConfig.WebhookHeaders) h.Equals(t, "WEBHOOK_TEMPLATE", nthConfig.WebhookTemplate) h.Equals(t, 101, nthConfig.MetadataTries) + h.Equals(t, false, nthConfig.CordonOnly) // Check that env vars were set value, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST") diff --git a/pkg/draineventstore/drain-event-store.go b/pkg/draineventstore/drain-event-store.go deleted file mode 100644 index 98bc0eec..00000000 --- a/pkg/draineventstore/drain-event-store.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"). You may -// not use this file except in compliance with the License. A copy of the -// License is located at -// -// http://aws.amazon.com/apache2.0/ -// -// or in the "license" file accompanying this file. This file is distributed -// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -// express or implied. See the License for the specific language governing -// permissions and limitations under the License - -package draineventstore - -import ( - "sync" - "time" - - "github.com/aws/aws-node-termination-handler/pkg/config" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" -) - -// Store is the drain event store data structure -type Store struct { - sync.RWMutex - NthConfig config.Config - drainEventStore map[string]*drainevent.DrainEvent - ignoredEvents map[string]struct{} - atLeastOneEvent bool -} - -// New Creates a new drain event store -func New(nthConfig config.Config) *Store { - return &Store{ - NthConfig: nthConfig, - drainEventStore: make(map[string]*drainevent.DrainEvent), - ignoredEvents: make(map[string]struct{}), - } -} - -// CancelDrainEvent removes a drain event from the internal store -func (s *Store) CancelDrainEvent(eventID string) { - s.Lock() - defer s.Unlock() - delete(s.drainEventStore, eventID) -} - -// AddDrainEvent adds a drain event to the internal store -func (s *Store) AddDrainEvent(drainEvent *drainevent.DrainEvent) { - s.RLock() - _, ok := s.drainEventStore[drainEvent.EventID] - s.RUnlock() - if ok { - return - } - s.Lock() - defer s.Unlock() - s.drainEventStore[drainEvent.EventID] = drainEvent - if _, ignored := s.ignoredEvents[drainEvent.EventID]; !ignored { - s.atLeastOneEvent = true - } - return -} - -// GetActiveEvent returns true if there are drainable events in the internal store -func (s *Store) GetActiveEvent() (*drainevent.DrainEvent, bool) { - s.RLock() - defer s.RUnlock() - for _, drainEvent := range s.drainEventStore { - if s.shouldEventDrain(drainEvent) { - return drainEvent, true - } - } - return &drainevent.DrainEvent{}, false -} - -// ShouldDrainNode returns true if there are drainable events in the internal store -func (s *Store) ShouldDrainNode() bool { - s.RLock() - defer s.RUnlock() - for _, drainEvent := range s.drainEventStore { - if s.shouldEventDrain(drainEvent) { - return true - } - } - return false -} - -func (s *Store) shouldEventDrain(drainEvent *drainevent.DrainEvent) bool { - _, ignored := s.ignoredEvents[drainEvent.EventID] - if !ignored && !drainEvent.Drained && s.TimeUntilDrain(drainEvent) <= 0 { - return true - } - return false -} - -// TimeUntilDrain returns the duration until a node drain should occur (can return a negative duration) -func (s *Store) TimeUntilDrain(drainEvent *drainevent.DrainEvent) time.Duration { - nodeTerminationGracePeriod := time.Duration(s.NthConfig.NodeTerminationGracePeriod) * time.Second - drainTime := drainEvent.StartTime.Add(-1 * nodeTerminationGracePeriod) - return drainTime.Sub(time.Now()) -} - -// MarkAllAsDrained should be called after the node has been drained to prevent further unnecessary drain calls to the k8s api -func (s *Store) MarkAllAsDrained() { - s.Lock() - defer s.Unlock() - for _, drainEvent := range s.drainEventStore { - drainEvent.Drained = true - } -} - -// IgnoreEvent will store an event ID so that monitor loops cannot write to the store with the same event ID -// Drain actions are ignored on the passed in event ID by setting the Drained flag to true -func (s *Store) IgnoreEvent(eventID string) { - if eventID == "" { - return - } - s.Lock() - defer s.Unlock() - s.ignoredEvents[eventID] = struct{}{} -} - -// ShouldUncordonNode returns true if there was a drainable event but it was canceled and the store is now empty or only consists of ignored events -func (s *Store) ShouldUncordonNode() bool { - s.RLock() - defer s.RUnlock() - if !s.atLeastOneEvent { - return false - } - if len(s.drainEventStore) == 0 { - return true - } - for _, drainEvent := range s.drainEventStore { - if _, ignored := s.ignoredEvents[drainEvent.EventID]; !ignored { - return false - } - } - return true -} diff --git a/pkg/drainevent/drain-event.go b/pkg/interruptionevent/interruption-event.go similarity index 79% rename from pkg/drainevent/drain-event.go rename to pkg/interruptionevent/interruption-event.go index 59319872..595698c0 100644 --- a/pkg/drainevent/drain-event.go +++ b/pkg/interruptionevent/interruption-event.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent +package interruptionevent import ( "time" @@ -19,10 +19,10 @@ import ( "github.com/aws/aws-node-termination-handler/pkg/node" ) -type preDrainTask func(DrainEvent, node.Node) error +type preDrainTask func(InterruptionEvent, node.Node) error -// DrainEvent gives more context of the drainable event -type DrainEvent struct { +// InterruptionEvent gives more context of the interruption event +type InterruptionEvent struct { EventID string Kind string Description string @@ -34,6 +34,6 @@ type DrainEvent struct { } // TimeUntilEvent returns the duration until the event start time -func (e *DrainEvent) TimeUntilEvent() time.Duration { +func (e *InterruptionEvent) TimeUntilEvent() time.Duration { return e.StartTime.Sub(time.Now()) } diff --git a/pkg/drainevent/drain-event_test.go b/pkg/interruptionevent/interruption-event_test.go similarity index 86% rename from pkg/drainevent/drain-event_test.go rename to pkg/interruptionevent/interruption-event_test.go index 4a65a384..cf41e3e9 100644 --- a/pkg/drainevent/drain-event_test.go +++ b/pkg/interruptionevent/interruption-event_test.go @@ -11,13 +11,13 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent_test +package interruptionevent_test import ( "testing" "time" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" h "github.com/aws/aws-node-termination-handler/pkg/test" ) @@ -25,7 +25,7 @@ func TestTimeUntilEvent(t *testing.T) { startTime := time.Now().Add(time.Second * 10) expected := startTime.Sub(time.Now()).Round(time.Second) - event := &drainevent.DrainEvent{ + event := &interruptionevent.InterruptionEvent{ StartTime: startTime, } diff --git a/pkg/drainevent/scheduled-event.go b/pkg/interruptionevent/scheduled-event.go similarity index 80% rename from pkg/drainevent/scheduled-event.go rename to pkg/interruptionevent/scheduled-event.go index f8e53e06..fff6903f 100644 --- a/pkg/drainevent/scheduled-event.go +++ b/pkg/interruptionevent/scheduled-event.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent +package interruptionevent import ( "fmt" @@ -23,7 +23,7 @@ import ( ) const ( - // ScheduledEventKind is a const to define a scheduled event kind of drainable event + // ScheduledEventKind is a const to define a scheduled event kind of interruption event ScheduledEventKind = "SCHEDULED_EVENT" scheduledEventStateCompleted = "completed" scheduledEventStateCanceled = "canceled" @@ -34,31 +34,31 @@ const ( instanceRetirementCode = "instance-retirement" ) -// MonitorForScheduledEvents continuously monitors metadata for scheduled events and sends drain events to the passed in channel -func MonitorForScheduledEvents(drainChan chan<- DrainEvent, cancelChan chan<- DrainEvent, imds *ec2metadata.Service) error { - drainEvents, err := checkForScheduledEvents(imds) +// MonitorForScheduledEvents continuously monitors metadata for scheduled events and sends interruption events to the passed in channel +func MonitorForScheduledEvents(interruptionChan chan<- InterruptionEvent, cancelChan chan<- InterruptionEvent, imds *ec2metadata.Service) error { + interruptionEvents, err := checkForScheduledEvents(imds) if err != nil { return err } - for _, drainEvent := range drainEvents { - if isStateCanceledOrCompleted(drainEvent.State) { + for _, interruptionEvent := range interruptionEvents { + if isStateCanceledOrCompleted(interruptionEvent.State) { log.Println("Sending cancel events to the cancel channel") - cancelChan <- drainEvent + cancelChan <- interruptionEvent } else { - log.Println("Sending drain events to the drain channel") - drainChan <- drainEvent + log.Println("Sending interruption events to the interruption channel") + interruptionChan <- interruptionEvent } } return nil } // checkForScheduledEvents Checks EC2 instance metadata for a scheduled event requiring a node drain -func checkForScheduledEvents(imds *ec2metadata.Service) ([]DrainEvent, error) { +func checkForScheduledEvents(imds *ec2metadata.Service) ([]InterruptionEvent, error) { scheduledEvents, err := imds.GetScheduledMaintenanceEvents() if err != nil { return nil, fmt.Errorf("Unable to parse metadata response: %w", err) } - events := make([]DrainEvent, 0) + events := make([]InterruptionEvent, 0) for _, scheduledEvent := range scheduledEvents { var preDrainFunc preDrainTask if isRestartEvent(scheduledEvent.Code) && !isStateCanceledOrCompleted(scheduledEvent.State) { @@ -72,7 +72,7 @@ func checkForScheduledEvents(imds *ec2metadata.Service) ([]DrainEvent, error) { if err != nil { return nil, fmt.Errorf("Unable to parsed scheduled event end time: %w", err) } - events = append(events, DrainEvent{ + events = append(events, InterruptionEvent{ EventID: scheduledEvent.EventID, Kind: ScheduledEventKind, Description: fmt.Sprintf("%s will occur between %s and %s because %s\n", scheduledEvent.Code, scheduledEvent.NotBefore, scheduledEvent.NotAfter, scheduledEvent.Description), @@ -85,8 +85,8 @@ func checkForScheduledEvents(imds *ec2metadata.Service) ([]DrainEvent, error) { return events, nil } -func uncordonAfterRebootPreDrain(drainEvent DrainEvent, node node.Node) error { - err := node.MarkWithEventID(drainEvent.EventID) +func uncordonAfterRebootPreDrain(interruptionEvent InterruptionEvent, node node.Node) error { + err := node.MarkWithEventID(interruptionEvent.EventID) if err != nil { return fmt.Errorf("Unable to mark node with event ID: %w", err) } diff --git a/pkg/drainevent/scheduled-event_internal_test.go b/pkg/interruptionevent/scheduled-event_internal_test.go similarity index 93% rename from pkg/drainevent/scheduled-event_internal_test.go rename to pkg/interruptionevent/scheduled-event_internal_test.go index 4fd4a7df..3e9bbe00 100644 --- a/pkg/drainevent/scheduled-event_internal_test.go +++ b/pkg/interruptionevent/scheduled-event_internal_test.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent +package interruptionevent import ( "flag" @@ -66,7 +66,7 @@ func getNode(t *testing.T, drainHelper *drain.Helper) *node.Node { } func TestUncordonAfterRebootPreDrainSuccess(t *testing.T) { - drainEvent := DrainEvent{} + drainEvent := InterruptionEvent{} nthConfig := config.Config{ DryRun: true, } @@ -79,7 +79,7 @@ func TestUncordonAfterRebootPreDrainMarkWithEventIDFailure(t *testing.T) { resetFlagsForTest() tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) - err := uncordonAfterRebootPreDrain(DrainEvent{}, *tNode) + err := uncordonAfterRebootPreDrain(InterruptionEvent{}, *tNode) h.Assert(t, err != nil, "Failed to return error on MarkWithEventID failing to fetch node") } @@ -97,6 +97,6 @@ func TestUncordonAfterRebootPreDrainNodeAlreadyMarkedSuccess(t *testing.T) { }) tNode := getNode(t, getDrainHelper(client)) - err := uncordonAfterRebootPreDrain(DrainEvent{}, *tNode) + err := uncordonAfterRebootPreDrain(InterruptionEvent{}, *tNode) h.Ok(t, err) } diff --git a/pkg/drainevent/scheduled-event_test.go b/pkg/interruptionevent/scheduled-event_test.go similarity index 83% rename from pkg/drainevent/scheduled-event_test.go rename to pkg/interruptionevent/scheduled-event_test.go index affdc078..455c7fa8 100644 --- a/pkg/drainevent/scheduled-event_test.go +++ b/pkg/interruptionevent/scheduled-event_test.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent_test +package interruptionevent_test import ( "net/http" @@ -19,8 +19,8 @@ import ( "strings" "testing" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" h "github.com/aws/aws-node-termination-handler/pkg/test" ) @@ -58,14 +58,14 @@ func TestMonitorForScheduledEventsSuccess(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) go func() { result := <-drainChan h.Equals(t, scheduledEventId, result.EventID) - h.Equals(t, drainevent.ScheduledEventKind, result.Kind) + h.Equals(t, interruptionevent.ScheduledEventKind, result.Kind) h.Equals(t, scheduledEventState, result.State) h.Equals(t, expScheduledEventStartTimeFmt, result.StartTime.String()) h.Equals(t, expScheduledEventEndTimeFmt, result.EndTime.String()) @@ -85,7 +85,7 @@ func TestMonitorForScheduledEventsSuccess(t *testing.T) { }() - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Ok(t, err) } @@ -109,14 +109,14 @@ func TestMonitorForScheduledEventsCanceledEvent(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) go func() { result := <-cancelChan h.Equals(t, scheduledEventId, result.EventID) - h.Equals(t, drainevent.ScheduledEventKind, result.Kind) + h.Equals(t, interruptionevent.ScheduledEventKind, result.Kind) h.Equals(t, state, result.State) h.Equals(t, expScheduledEventStartTimeFmt, result.StartTime.String()) h.Equals(t, expScheduledEventEndTimeFmt, result.EndTime.String()) @@ -136,7 +136,7 @@ func TestMonitorForScheduledEventsCanceledEvent(t *testing.T) { }() - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Ok(t, err) } @@ -152,11 +152,11 @@ func TestMonitorForScheduledEventsMetadataParseFailure(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New("bad url", 0) - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when metadata parse fails") } @@ -173,11 +173,11 @@ func TestMonitorForScheduledEvents404Response(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when 404 response") } @@ -200,11 +200,11 @@ func TestMonitorForScheduledEventsStartTimeParseFail(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when failed to parse start time") } @@ -227,10 +227,10 @@ func TestMonitorForScheduledEventsEndTimeParseFail(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForScheduledEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when failed to parse end time") } diff --git a/pkg/drainevent/spot-itn-event.go b/pkg/interruptionevent/spot-itn-event.go similarity index 75% rename from pkg/drainevent/spot-itn-event.go rename to pkg/interruptionevent/spot-itn-event.go index 463d3c51..6d0afd98 100644 --- a/pkg/drainevent/spot-itn-event.go +++ b/pkg/interruptionevent/spot-itn-event.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent +package interruptionevent import ( "crypto/sha256" @@ -23,25 +23,25 @@ import ( ) const ( - // SpotITNKind is a const to define a Spot ITN kind of drainable event + // SpotITNKind is a const to define a Spot ITN kind of interruption event SpotITNKind = "SPOT_ITN" ) -// MonitorForSpotITNEvents continuously monitors metadata for spot ITNs and sends drain events to the passed in channel -func MonitorForSpotITNEvents(drainChan chan<- DrainEvent, cancelChan chan<- DrainEvent, imds *ec2metadata.Service) error { - drainEvent, err := checkForSpotInterruptionNotice(imds) +// MonitorForSpotITNEvents continuously monitors metadata for spot ITNs and sends interruption events to the passed in channel +func MonitorForSpotITNEvents(interruptionChan chan<- InterruptionEvent, cancelChan chan<- InterruptionEvent, imds *ec2metadata.Service) error { + interruptionEvent, err := checkForSpotInterruptionNotice(imds) if err != nil { return err } - if drainEvent != nil && drainEvent.Kind == SpotITNKind { - log.Println("Sending drain event to the drain channel") - drainChan <- *drainEvent + if interruptionEvent != nil && interruptionEvent.Kind == SpotITNKind { + log.Println("Sending interruption event to the interruption channel") + interruptionChan <- *interruptionEvent } return nil } // checkForSpotInterruptionNotice Checks EC2 instance metadata for a spot interruption termination notice -func checkForSpotInterruptionNotice(imds *ec2metadata.Service) (*DrainEvent, error) { +func checkForSpotInterruptionNotice(imds *ec2metadata.Service) (*InterruptionEvent, error) { instanceAction, err := imds.GetSpotITNEvent() if instanceAction == nil && err == nil { // if there are no spot itns and no errors @@ -59,7 +59,7 @@ func checkForSpotInterruptionNotice(imds *ec2metadata.Service) (*DrainEvent, err hash := sha256.New() hash.Write([]byte(fmt.Sprintf("%v", instanceAction))) - return &DrainEvent{ + return &InterruptionEvent{ EventID: fmt.Sprintf("spot-itn-%x", hash.Sum(nil)), Kind: SpotITNKind, StartTime: interruptionTime, diff --git a/pkg/drainevent/spot-itn-event_test.go b/pkg/interruptionevent/spot-itn-event_test.go similarity index 75% rename from pkg/drainevent/spot-itn-event_test.go rename to pkg/interruptionevent/spot-itn-event_test.go index 5abc65c3..e4ae0153 100644 --- a/pkg/drainevent/spot-itn-event_test.go +++ b/pkg/interruptionevent/spot-itn-event_test.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License. -package drainevent_test +package interruptionevent_test import ( "net/http" @@ -19,8 +19,8 @@ import ( "strings" "testing" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" h "github.com/aws/aws-node-termination-handler/pkg/test" ) @@ -48,19 +48,19 @@ func TestMonitorForSpotITNEventsSuccess(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) go func() { result := <-drainChan - h.Equals(t, drainevent.SpotITNKind, result.Kind) + h.Equals(t, interruptionevent.SpotITNKind, result.Kind) h.Equals(t, expFormattedTime, result.StartTime.String()) h.Assert(t, strings.Contains(result.Description, startTime), "Expected description to contain: "+startTime+" but is actually: "+result.Description) }() - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Ok(t, err) } @@ -73,11 +73,11 @@ func TestMonitorForSpotITNEventsMetadataParseFailure(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error metadata parse fails") } @@ -94,11 +94,11 @@ func TestMonitorForSpotITNEvents404Response(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Ok(t, err) } @@ -115,11 +115,11 @@ func TestMonitorForSpotITNEvents500Response(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when 500 response") } @@ -136,11 +136,11 @@ func TestMonitorForSpotITNEventsInstanceActionDecodeFailure(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when failed to decode instance action") } @@ -157,10 +157,10 @@ func TestMonitorForSpotITNEventsTimeParseFailure(t *testing.T) { })) defer server.Close() - drainChan := make(chan drainevent.DrainEvent) - cancelChan := make(chan drainevent.DrainEvent) + drainChan := make(chan interruptionevent.InterruptionEvent) + cancelChan := make(chan interruptionevent.InterruptionEvent) imds := ec2metadata.New(server.URL, 1) - err := drainevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) + err := interruptionevent.MonitorForSpotITNEvents(drainChan, cancelChan, imds) h.Assert(t, err != nil, "Failed to return error when failed to parse time") } diff --git a/pkg/interruptioneventstore/interruption-event-store.go b/pkg/interruptioneventstore/interruption-event-store.go new file mode 100644 index 00000000..89fe26bf --- /dev/null +++ b/pkg/interruptioneventstore/interruption-event-store.go @@ -0,0 +1,141 @@ +// Copyright 2016-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License + +package interruptioneventstore + +import ( + "sync" + "time" + + "github.com/aws/aws-node-termination-handler/pkg/config" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" +) + +// Store is the drain event store data structure +type Store struct { + sync.RWMutex + NthConfig config.Config + interruptionEventStore map[string]*interruptionevent.InterruptionEvent + ignoredEvents map[string]struct{} + atLeastOneEvent bool +} + +// New Creates a new interruption event store +func New(nthConfig config.Config) *Store { + return &Store{ + NthConfig: nthConfig, + interruptionEventStore: make(map[string]*interruptionevent.InterruptionEvent), + ignoredEvents: make(map[string]struct{}), + } +} + +// CancelInterruptionEvent removes an interruption event from the internal store +func (s *Store) CancelInterruptionEvent(eventID string) { + s.Lock() + defer s.Unlock() + delete(s.interruptionEventStore, eventID) +} + +// AddInterruptionEvent adds an interruption event to the internal store +func (s *Store) AddInterruptionEvent(interruptionEvent *interruptionevent.InterruptionEvent) { + s.RLock() + _, ok := s.interruptionEventStore[interruptionEvent.EventID] + s.RUnlock() + if ok { + return + } + s.Lock() + defer s.Unlock() + s.interruptionEventStore[interruptionEvent.EventID] = interruptionEvent + if _, ignored := s.ignoredEvents[interruptionEvent.EventID]; !ignored { + s.atLeastOneEvent = true + } + return +} + +// GetActiveEvent returns true if there are interruption events in the internal store +func (s *Store) GetActiveEvent() (*interruptionevent.InterruptionEvent, bool) { + s.RLock() + defer s.RUnlock() + for _, interruptionEvent := range s.interruptionEventStore { + if s.shouldEventDrain(interruptionEvent) { + return interruptionEvent, true + } + } + return &interruptionevent.InterruptionEvent{}, false +} + +// ShouldDrainNode returns true if there are drainable events in the internal store +func (s *Store) ShouldDrainNode() bool { + s.RLock() + defer s.RUnlock() + for _, interruptionEvent := range s.interruptionEventStore { + if s.shouldEventDrain(interruptionEvent) { + return true + } + } + return false +} + +func (s *Store) shouldEventDrain(interruptionEvent *interruptionevent.InterruptionEvent) bool { + _, ignored := s.ignoredEvents[interruptionEvent.EventID] + if !ignored && !interruptionEvent.Drained && s.TimeUntilDrain(interruptionEvent) <= 0 { + return true + } + return false +} + +// TimeUntilDrain returns the duration until a node drain should occur (can return a negative duration) +func (s *Store) TimeUntilDrain(interruptionEvent *interruptionevent.InterruptionEvent) time.Duration { + nodeTerminationGracePeriod := time.Duration(s.NthConfig.NodeTerminationGracePeriod) * time.Second + drainTime := interruptionEvent.StartTime.Add(-1 * nodeTerminationGracePeriod) + return drainTime.Sub(time.Now()) +} + +// MarkAllAsDrained should be called after the node has been drained to prevent further unnecessary drain calls to the k8s api +func (s *Store) MarkAllAsDrained() { + s.Lock() + defer s.Unlock() + for _, interruptionEvent := range s.interruptionEventStore { + interruptionEvent.Drained = true + } +} + +// IgnoreEvent will store an event ID so that monitor loops cannot write to the store with the same event ID +// Drain actions are ignored on the passed in event ID by setting the Drained flag to true +func (s *Store) IgnoreEvent(eventID string) { + if eventID == "" { + return + } + s.Lock() + defer s.Unlock() + s.ignoredEvents[eventID] = struct{}{} +} + +// ShouldUncordonNode returns true if there was a interruption event but it was canceled and the store is now empty or only consists of ignored events +func (s *Store) ShouldUncordonNode() bool { + s.RLock() + defer s.RUnlock() + if !s.atLeastOneEvent { + return false + } + if len(s.interruptionEventStore) == 0 { + return true + } + for _, interruptionEvent := range s.interruptionEventStore { + if _, ignored := s.ignoredEvents[interruptionEvent.EventID]; !ignored { + return false + } + } + return true +} diff --git a/pkg/draineventstore/drain-event-store_test.go b/pkg/interruptioneventstore/interruption-event-store_test.go similarity index 67% rename from pkg/draineventstore/drain-event-store_test.go rename to pkg/interruptioneventstore/interruption-event-store_test.go index 8eb14297..1fbf0119 100644 --- a/pkg/draineventstore/drain-event-store_test.go +++ b/pkg/interruptioneventstore/interruption-event-store_test.go @@ -11,7 +11,7 @@ // express or implied. See the License for the specific language governing // permissions and limitations under the License -package draineventstore_test +package interruptioneventstore_test import ( "fmt" @@ -21,49 +21,49 @@ import ( "time" "github.com/aws/aws-node-termination-handler/pkg/config" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" - "github.com/aws/aws-node-termination-handler/pkg/draineventstore" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" + "github.com/aws/aws-node-termination-handler/pkg/interruptioneventstore" h "github.com/aws/aws-node-termination-handler/pkg/test" ) func TestAddDrainEvent(t *testing.T) { - store := draineventstore.New(config.Config{}) + store := interruptioneventstore.New(config.Config{}) - event1 := &drainevent.DrainEvent{ + event1 := &interruptionevent.InterruptionEvent{ EventID: "123", State: "Active", StartTime: time.Now(), } - store.AddDrainEvent(event1) + store.AddInterruptionEvent(event1) storedEvent, isActive := store.GetActiveEvent() h.Equals(t, true, isActive) h.Equals(t, event1.EventID, storedEvent.EventID) // Attempt to add new event with the same eventID - event2 := &drainevent.DrainEvent{ + event2 := &interruptionevent.InterruptionEvent{ EventID: "123", State: "Something Else", StartTime: time.Now(), } - store.AddDrainEvent(event2) + store.AddInterruptionEvent(event2) storedEvent, isActive = store.GetActiveEvent() h.Equals(t, true, isActive) h.Equals(t, event1.EventID, storedEvent.EventID) h.Equals(t, event1.State, storedEvent.State) } -func TestCancelDrainEvent(t *testing.T) { - store := draineventstore.New(config.Config{}) +func TestCancelInterruptionEvent(t *testing.T) { + store := interruptioneventstore.New(config.Config{}) - event := &drainevent.DrainEvent{ + event := &interruptionevent.InterruptionEvent{ EventID: "123", StartTime: time.Now(), } - store.AddDrainEvent(event) + store.AddInterruptionEvent(event) - store.CancelDrainEvent(event.EventID) + store.CancelInterruptionEvent(event.EventID) storedEvent, isActive := store.GetActiveEvent() h.Equals(t, false, isActive) @@ -72,37 +72,37 @@ func TestCancelDrainEvent(t *testing.T) { } func TestShouldDrainNode(t *testing.T) { - store := draineventstore.New(config.Config{}) - futureEvent := &drainevent.DrainEvent{ + store := interruptioneventstore.New(config.Config{}) + futureEvent := &interruptionevent.InterruptionEvent{ EventID: "future", StartTime: time.Now().Add(time.Second * 20), } - store.AddDrainEvent(futureEvent) + store.AddInterruptionEvent(futureEvent) h.Equals(t, false, store.ShouldDrainNode()) - currentEvent := &drainevent.DrainEvent{ + currentEvent := &interruptionevent.InterruptionEvent{ EventID: "current", StartTime: time.Now(), } - store.AddDrainEvent(currentEvent) + store.AddInterruptionEvent(currentEvent) h.Equals(t, true, store.ShouldDrainNode()) } func TestMarkAllAsDrained(t *testing.T) { - store := draineventstore.New(config.Config{}) - event1 := &drainevent.DrainEvent{ + store := interruptioneventstore.New(config.Config{}) + event1 := &interruptionevent.InterruptionEvent{ EventID: "1", StartTime: time.Now().Add(time.Second * 20), Drained: false, } - event2 := &drainevent.DrainEvent{ + event2 := &interruptionevent.InterruptionEvent{ EventID: "2", StartTime: time.Now().Add(time.Second * 20), Drained: false, } - store.AddDrainEvent(event1) - store.AddDrainEvent(event2) + store.AddInterruptionEvent(event1) + store.AddInterruptionEvent(event2) store.MarkAllAsDrained() // When events are marked as Drained=true, then they are no longer @@ -113,33 +113,33 @@ func TestMarkAllAsDrained(t *testing.T) { func TestShouldUncordonNode(t *testing.T) { eventID := "123" - store := draineventstore.New(config.Config{}) + store := interruptioneventstore.New(config.Config{}) h.Equals(t, false, store.ShouldUncordonNode()) - event := &drainevent.DrainEvent{ + event := &interruptionevent.InterruptionEvent{ EventID: eventID, } - store.AddDrainEvent(event) + store.AddInterruptionEvent(event) h.Equals(t, false, store.ShouldUncordonNode()) - store.CancelDrainEvent(event.EventID) + store.CancelInterruptionEvent(event.EventID) h.Equals(t, true, store.ShouldUncordonNode()) store.IgnoreEvent(eventID) - store.AddDrainEvent(event) + store.AddInterruptionEvent(event) h.Equals(t, true, store.ShouldUncordonNode()) } func TestIgnoreEvent(t *testing.T) { eventID := "event-id-123" - store := draineventstore.New(config.Config{}) + store := interruptioneventstore.New(config.Config{}) store.IgnoreEvent("") - event := &drainevent.DrainEvent{ + event := &interruptionevent.InterruptionEvent{ EventID: eventID, State: "active", StartTime: time.Now(), } - store.AddDrainEvent(event) + store.AddInterruptionEvent(event) h.Equals(t, true, store.ShouldDrainNode()) store.IgnoreEvent(eventID) @@ -149,15 +149,15 @@ func TestIgnoreEvent(t *testing.T) { // BenchmarkDrainEventStore tests concurrent read/write patterns. We don't really care about the timings as long as deadlock doesn't occur func BenchmarkDrainEventStore(b *testing.B) { idBound := 10 - store := draineventstore.New(config.Config{}) + store := interruptioneventstore.New(config.Config{}) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - store.AddDrainEvent(&drainevent.DrainEvent{ + store.AddInterruptionEvent(&interruptionevent.InterruptionEvent{ EventID: strconv.Itoa(rand.Intn(idBound)), StartTime: time.Now(), }) store.IgnoreEvent(strconv.Itoa(rand.Intn(idBound))) - store.CancelDrainEvent(strconv.Itoa(rand.Intn(idBound))) + store.CancelInterruptionEvent(strconv.Itoa(rand.Intn(idBound))) store.GetActiveEvent() store.ShouldDrainNode() store.ShouldUncordonNode() diff --git a/pkg/node/node.go b/pkg/node/node.go index fa828688..ff529ea8 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -71,13 +71,13 @@ func NewWithValues(nthConfig config.Config, drainHelper *drain.Helper) (*Node, e }, nil } -// Drain will cordon the node and evict pods based on the config -func (n Node) Drain() error { +// CordonAndDrain will cordon the node and evict pods based on the config +func (n Node) CordonAndDrain() error { if n.nthConfig.DryRun { log.Printf("Node %s would have been cordoned and drained, but dry-run flag was set\n", n.nthConfig.NodeName) return nil } - err := n.cordonNode() + err := n.Cordon() if err != nil { return err } @@ -90,7 +90,11 @@ func (n Node) Drain() error { } // Cordon will add a NoSchedule on the node -func (n Node) cordonNode() error { +func (n Node) Cordon() error { + if n.nthConfig.DryRun { + log.Printf("Node %s would have been cordoned, but dry-run flag was set\n", n.nthConfig.NodeName) + return nil + } node, err := n.fetchKubernetesNode() if err != nil { return err diff --git a/pkg/node/node_test.go b/pkg/node/node_test.go index f28e9d31..6524ab03 100644 --- a/pkg/node/node_test.go +++ b/pkg/node/node_test.go @@ -70,7 +70,10 @@ func TestDryRun(t *testing.T) { tNode, err := node.New(config.Config{DryRun: true}) h.Ok(t, err) - err = tNode.Drain() + err = tNode.CordonAndDrain() + h.Ok(t, err) + + err = tNode.Cordon() h.Ok(t, err) err = tNode.Uncordon() @@ -110,7 +113,7 @@ func TestDrainSuccess(t *testing.T) { client.CoreV1().Nodes().Create(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) tNode := getNode(t, getDrainHelper(client)) - err := tNode.Drain() + err := tNode.CordonAndDrain() h.Ok(t, err) } @@ -118,8 +121,8 @@ func TestDrainCordonNodeFailure(t *testing.T) { resetFlagsForTest() tNode := getNode(t, getDrainHelper(fake.NewSimpleClientset())) - err := tNode.Drain() - h.Assert(t, true, "Failed to return error on Drain failing to cordon node", err != nil) + err := tNode.CordonAndDrain() + h.Assert(t, true, "Failed to return error on CordonAndDrain failing to cordon node", err != nil) } func TestUncordonSuccess(t *testing.T) { diff --git a/pkg/webhook/webhook.go b/pkg/webhook/webhook.go index 899ac801..68909b51 100644 --- a/pkg/webhook/webhook.go +++ b/pkg/webhook/webhook.go @@ -23,17 +23,17 @@ import ( "time" "github.com/aws/aws-node-termination-handler/pkg/config" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" ) type combinedDrainData struct { ec2metadata.NodeMetadata - drainevent.DrainEvent + interruptionevent.InterruptionEvent } // Post makes a http post to send drain event data to webhook url -func Post(additionalInfo ec2metadata.NodeMetadata, event *drainevent.DrainEvent, nthconfig config.Config) { +func Post(additionalInfo ec2metadata.NodeMetadata, event *interruptionevent.InterruptionEvent, nthconfig config.Config) { webhookTemplate, err := template.New("message").Parse(nthconfig.WebhookTemplate) if err != nil { diff --git a/pkg/webhook/webhook_test.go b/pkg/webhook/webhook_test.go index e6027237..c551df1f 100644 --- a/pkg/webhook/webhook_test.go +++ b/pkg/webhook/webhook_test.go @@ -26,8 +26,8 @@ import ( "time" "github.com/aws/aws-node-termination-handler/pkg/config" - "github.com/aws/aws-node-termination-handler/pkg/drainevent" "github.com/aws/aws-node-termination-handler/pkg/ec2metadata" + "github.com/aws/aws-node-termination-handler/pkg/interruptionevent" h "github.com/aws/aws-node-termination-handler/pkg/test" "github.com/aws/aws-node-termination-handler/pkg/webhook" ) @@ -43,7 +43,7 @@ func parseScheduledEventTime(inputTime string) time.Time { return scheduledTime } -func getExpectedMessage(event *drainevent.DrainEvent) string { +func getExpectedMessage(event *interruptionevent.InterruptionEvent) string { webhookTemplate, err := template.New("").Parse(testWebhookTemplate) if err != nil { log.Printf("Webhook Error: Template parsing failed - %s\n", err) @@ -64,7 +64,7 @@ func getExpectedMessage(event *drainevent.DrainEvent) string { func TestPostSuccess(t *testing.T) { var requestPath string = "/some/path" - event := &drainevent.DrainEvent{ + event := &interruptionevent.InterruptionEvent{ EventID: "instance-event-0d59937288b749b32", Kind: "SCHEDULED_EVENT", Description: "Scheduled event will occur", @@ -116,7 +116,7 @@ func TestPostTemplateParseError(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: server.URL, WebhookTemplate: "{{ ", @@ -133,7 +133,7 @@ func TestPostTemplateExecutionError(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: server.URL, WebhookTemplate: `{{.cat}}`, @@ -150,7 +150,7 @@ func TestPostNewHttpRequestError(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: "\t", WebhookTemplate: testWebhookTemplate, @@ -166,7 +166,7 @@ func TestPostHeaderParseFail(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: server.URL, WebhookTemplate: testWebhookTemplate, @@ -184,7 +184,7 @@ func TestPostTimeout(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: server.URL, WebhookTemplate: testWebhookTemplate, @@ -204,7 +204,7 @@ func TestPostBadResponseCode(t *testing.T) { })) defer server.Close() - event := &drainevent.DrainEvent{} + event := &interruptionevent.InterruptionEvent{} nthconfig := config.Config{ WebhookURL: server.URL, WebhookTemplate: testWebhookTemplate, diff --git a/test/e2e/cordon-only-test b/test/e2e/cordon-only-test new file mode 100755 index 00000000..b062c91a --- /dev/null +++ b/test/e2e/cordon-only-test @@ -0,0 +1,64 @@ +#!/bin/bash +set -euo pipefail + +# Available env vars: +# $TMP_DIR +# $CLUSTER_NAME +# $KUBECONFIG +# $NODE_TERMINATION_HANDLER_DOCKER_REPO +# $NODE_TERMINATION_HANDLER_DOCKER_TAG +# $EC2_METADATA_DOCKER_REPO +# $EC2_METADATA_DOCKER_TAG + +echo "Starting Cordon Only Test for Node Termination Handler" + +SCRIPTPATH="$( cd "$(dirname "$0")" ; pwd -P )" + +helm upgrade --install $CLUSTER_NAME-anth $SCRIPTPATH/../../config/helm/aws-node-termination-handler/ \ + --wait \ + --force \ + --namespace kube-system \ + --set instanceMetadataURL="http://localhost:$IMDS_PORT" \ + --set image.repository="$NODE_TERMINATION_HANDLER_DOCKER_REPO" \ + --set image.tag="$NODE_TERMINATION_HANDLER_DOCKER_TAG" \ + --set cordonOnly="true" + +helm upgrade --install $CLUSTER_NAME-emtp $SCRIPTPATH/../../config/helm/ec2-metadata-test-proxy/ \ + --wait \ + --force \ + --namespace default \ + --set ec2MetadataTestProxy.image.repository="$EC2_METADATA_DOCKER_REPO" \ + --set ec2MetadataTestProxy.image.tag="$EC2_METADATA_DOCKER_TAG" \ + --set ec2MetadataTestProxy.port="$IMDS_PORT" + +TAINT_CHECK_CYCLES=15 +TAINT_CHECK_SLEEP=15 + +DEPLOYED=0 + +for i in `seq 1 10`; do + if [[ $(kubectl get deployments regular-pod-test -o jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified regular-pod-test pod was scheduled and started!" + DEPLOYED=1 + break + fi + sleep 5 +done + +if [[ $DEPLOYED -eq 0 ]]; then + exit 2 +fi + +for i in `seq 1 $TAINT_CHECK_CYCLES`; do + if kubectl get nodes $CLUSTER_NAME-worker | grep SchedulingDisabled; then + echo "✅ Verified the worker node was cordoned!" + if [[ $(kubectl get deployments regular-pod-test -o=jsonpath='{.status.unavailableReplicas}') -eq 0 ]]; then + echo "✅ Verified the regular-pod-test pod was NOT evicted!" + echo "✅ Cordon Only Test Passed $CLUSTER_NAME! ✅" + exit 0 + fi + fi + sleep $TAINT_CHECK_SLEEP +done + +exit 1 diff --git a/test/k8s-local-cluster-test/run-test b/test/k8s-local-cluster-test/run-test index 3a6005d8..0a16b9e5 100755 --- a/test/k8s-local-cluster-test/run-test +++ b/test/k8s-local-cluster-test/run-test @@ -102,7 +102,7 @@ EOM while getopts "pdn:e:oc:a:b:v:" opt; do case ${opt} in p ) # PRESERVE K8s Cluster - echo "❄️ This run will preserve the cluster as you requested" + echo "❄️ This run will preserve the cluster as you requested" PRESERVE=true ;; n ) # Node Termination Handler Docker Image