diff --git a/Gopkg.lock b/Gopkg.lock index 132e7da30..465549e05 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -839,7 +839,7 @@ "google.golang.org/grpc/connectivity", "google.golang.org/grpc/health/grpc_health_v1", "gopkg.in/yaml.v2", - "k8s.io/api/apps/v1beta2", + "k8s.io/api/apps/v1", "k8s.io/api/core/v1", "k8s.io/apimachinery/pkg/apis/meta/v1", "k8s.io/apimachinery/pkg/fields", diff --git a/cmd/watch.go b/cmd/watch.go index 3ae0f583e..9aa768c85 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -1,24 +1,19 @@ package cmd import ( - "context" "fmt" "net/http" "os" - "time" argus "github.com/logicmonitor/k8s-argus/pkg" "github.com/logicmonitor/k8s-argus/pkg/config" + "github.com/logicmonitor/k8s-argus/pkg/connection" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/healthz" + lmlog "github.com/logicmonitor/k8s-argus/pkg/log" "github.com/logicmonitor/k8s-argus/pkg/permission" - "github.com/logicmonitor/k8s-collectorset-controller/api" - collectorsetconstants "github.com/logicmonitor/k8s-collectorset-controller/pkg/constants" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" - "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - healthpb "google.golang.org/grpc/health/grpc_health_v1" ) // watchCmd represents the watch command @@ -43,6 +38,10 @@ var watchCmd = &cobra.Command{ log.SetLevel(log.DebugLevel) } + // Add hook to log pod id in log context + hook := &lmlog.DefaultFieldHook{} + log.AddHook(hook) + // Instantiate the base struct. base, err := argus.NewBase(config) if err != nil { @@ -52,19 +51,13 @@ var watchCmd = &cobra.Command{ // Init the permission component permission.Init(base.K8sClient) - // Set up a gRPC connection to the collectorset controller. - conn, err := grpc.Dial(config.Address, grpc.WithInsecure()) - if err != nil { - log.Fatal(err.Error()) - } - defer conn.Close() // nolint: errcheck - client, err := waitForCollectorSetClient(conn) - if err != nil { - log.Fatal(err.Error()) - } + // Set up a gRPC connection and CSC Client. + connection.Initialize(config) + + connection.CreateConnectionHandler() // Instantiate the application and add watchers. - argus, err := argus.NewArgus(base, client) + argus, err := argus.NewArgus(base) if err != nil { log.Fatal(err.Error()) } @@ -79,63 +72,6 @@ var watchCmd = &cobra.Command{ }, } -func waitForCollectorSetClient(conn *grpc.ClientConn) (api.CollectorSetControllerClient, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - - state := conn.GetState() - // Wait for connection to be Ready. - for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() { - log.Infof("Waiting for gRPC") - } - if state != connectivity.Ready { - log.Fatalf("Failed waiting for gRPC to ready, state is %q", state) - } - - log.Infof("State of gRPC is %q", state) - - client := api.NewCollectorSetControllerClient(conn) - - ready, err := pollCollectorSetStatus(conn) - if err != nil { - log.Fatal(err.Error()) - } - - if !ready { - log.Fatalf("The collectorset controller does not have any ready collectors") - } - log.Infof("The collectorset controller has available collectors") - - return client, nil -} - -func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) { - timeout := time.After(10 * time.Minute) - ticker := time.NewTicker(10 * time.Second) - for { - select { - case <-timeout: - return false, fmt.Errorf("timeout waiting for collectors to become available") - case <-ticker.C: - log.Debugf("Checking collectors status") - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) - defer cancel() - req := &healthpb.HealthCheckRequest{ - Service: collectorsetconstants.HealthServerServiceName, - } - hc := healthpb.NewHealthClient(conn) - healthCheckResponse, err := hc.Check(ctx, req) - if err != nil { - log.Errorf("Failed to get health check: %v", err) - } - if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING { - return true, nil - } - log.Debugf("The collectors are not ready: %d", healthCheckResponse.GetStatus()) - } - } -} - func init() { RootCmd.AddCommand(watchCmd) } diff --git a/pkg/argus.go b/pkg/argus.go index fb8a8273c..033f070bf 100644 --- a/pkg/argus.go +++ b/pkg/argus.go @@ -10,6 +10,7 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/config" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/device" + "github.com/logicmonitor/k8s-argus/pkg/devicecache" "github.com/logicmonitor/k8s-argus/pkg/devicegroup" "github.com/logicmonitor/k8s-argus/pkg/etcd" "github.com/logicmonitor/k8s-argus/pkg/sync" @@ -20,7 +21,6 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/watch/node" "github.com/logicmonitor/k8s-argus/pkg/watch/pod" "github.com/logicmonitor/k8s-argus/pkg/watch/service" - "github.com/logicmonitor/k8s-collectorset-controller/api" "github.com/logicmonitor/lm-sdk-go/client" "github.com/logicmonitor/lm-sdk-go/client/lm" log "github.com/sirupsen/logrus" @@ -93,14 +93,17 @@ func newK8sClient() (*kubernetes.Clientset, error) { } // NewArgus instantiates and returns argus. -func NewArgus(base *types.Base, client api.CollectorSetControllerClient) (*Argus, error) { +func NewArgus(base *types.Base) (*Argus, error) { argus := &Argus{ Base: base, } + dcache := devicecache.NewDeviceCache(base, 5) + dcache.Run() + deviceManager := &device.Manager{ - Base: base, - ControllerClient: client, + Base: base, + DC: dcache, } deviceTree := &tree.DeviceTree{ diff --git a/pkg/connection/connection.go b/pkg/connection/connection.go new file mode 100644 index 000000000..749f74096 --- /dev/null +++ b/pkg/connection/connection.go @@ -0,0 +1,145 @@ +package connection + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/logicmonitor/k8s-argus/pkg/config" + "github.com/logicmonitor/k8s-collectorset-controller/api" + collectorsetconstants "github.com/logicmonitor/k8s-collectorset-controller/pkg/constants" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +var ( + grpcConn *grpc.ClientConn + cscClient api.CollectorSetControllerClient + connLock sync.RWMutex + appConfig *config.Config +) + +// Initialize - it will initialize gRPC connection & csc client +func Initialize(config *config.Config) { + log.Info("Initializing gRPC connection & CSC Client.") + appConfig = config + createConnection() +} + +func createConnection() { + conn, grpcErr := createGRPCConnection() + if grpcErr != nil { + log.Errorf("Error while creating gRPC connection. Error: %v", grpcErr.Error()) + return + } + setGRPCConn(conn) + + client, cscErr := createCSCClient() + if cscErr != nil { + log.Errorf("Error while creating gRPC connection. Error: %v", cscErr.Error()) + return + } + setCSCClient(client) +} + +func setGRPCConn(conn *grpc.ClientConn) { + connLock.Lock() + defer connLock.Unlock() + grpcConn = conn +} + +func getGRPCConn() *grpc.ClientConn { + connLock.RLock() + defer connLock.RUnlock() + return grpcConn +} + +func setCSCClient(csc api.CollectorSetControllerClient) { + connLock.Lock() + defer connLock.Unlock() + cscClient = csc +} + +// GetCSCClient - returns CSC client +func GetCSCClient() api.CollectorSetControllerClient { + connLock.RLock() + defer connLock.RUnlock() + return cscClient +} + +func createGRPCConnection() (*grpc.ClientConn, error) { + timeout := time.After(10 * time.Minute) + ticker := time.NewTicker(10 * time.Second) + for { + select { + case <-timeout: + return nil, fmt.Errorf("timeout waiting for gRPC connection") + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second) + defer cancel() + conn, err := grpc.DialContext(ctx, appConfig.Address, grpc.WithBlock(), grpc.WithInsecure()) + if err != nil { + log.Errorf("Error while creating gRPC connection. Error: %v", err.Error()) + } else { + return conn, nil + } + } + } +} + +func createCSCClient() (api.CollectorSetControllerClient, error) { + conn := getGRPCConn() + client := api.NewCollectorSetControllerClient(conn) + + timeout := time.After(10 * time.Minute) + ticker := time.NewTicker(10 * time.Second) + hc := healthpb.NewHealthClient(conn) + for { + select { + case <-timeout: + return client, fmt.Errorf("timeout waiting for collectors to become available") + case <-ticker.C: + healthCheckResponse := getCSCHealth(hc) + if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING { + return client, nil + } + log.Debugf("The collectors are not ready: %v", healthCheckResponse.GetStatus().String()) + } + } +} + +func getCSCHealth(hc healthpb.HealthClient) *healthpb.HealthCheckResponse { + log.Debug("Checking collectors status") + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + req := &healthpb.HealthCheckRequest{ + Service: collectorsetconstants.HealthServerServiceName, + } + healthCheckResponse, err := hc.Check(ctx, req) + if err != nil { + log.Errorf("Failed to get health check: %v", err) + } + return healthCheckResponse +} + +// CreateConnectionHandler - It will create a go routine for handling gRPC connection creation +func CreateConnectionHandler() { + go func() { + for { + time.Sleep(time.Duration(10) * time.Second) + checkGRPCState() + } + }() +} + +// checkGRPCState - It will check gRPC state & call createConnection if required +func checkGRPCState() { + state := getGRPCConn().GetState() + if state == connectivity.Shutdown { + log.Infof("gRPC is in \"%v\" state. Creating new gRPC connection & CSC client.", state.String()) + createConnection() + } +} diff --git a/pkg/device/device.go b/pkg/device/device.go index 640e4f49a..ce16494c8 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -1,15 +1,15 @@ package device import ( - "context" "fmt" "strings" "github.com/logicmonitor/k8s-argus/pkg/config" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/device/builder" + "github.com/logicmonitor/k8s-argus/pkg/devicecache" "github.com/logicmonitor/k8s-argus/pkg/types" - "github.com/logicmonitor/k8s-collectorset-controller/api" + cscutils "github.com/logicmonitor/k8s-argus/pkg/utilities" "github.com/logicmonitor/lm-sdk-go/client/lm" "github.com/logicmonitor/lm-sdk-go/models" log "github.com/sirupsen/logrus" @@ -19,10 +19,10 @@ import ( type Manager struct { *types.Base *builder.Builder - ControllerClient api.CollectorSetControllerClient + DC *devicecache.DeviceCache } -func buildDevice(c *config.Config, client api.CollectorSetControllerClient, d *models.Device, options ...types.DeviceOption) *models.Device { +func buildDevice(c *config.Config, d *models.Device, options ...types.DeviceOption) *models.Device { if d == nil { hostGroupIds := "1" propertyName := constants.K8sClusterNamePropertyKey @@ -44,13 +44,9 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, d *m option(d) } - reply, err := client.CollectorID(context.Background(), &api.CollectorIDRequest{}) - if err != nil { - log.Errorf("Failed to get collector ID: %v", err) - } else { - log.Infof("Using collector ID %d for %q", reply.Id, *d.DisplayName) - d.PreferredCollectorID = &reply.Id - } + collectorID := cscutils.GetCollectorID() + log.Infof("Using collector ID %d for %q", collectorID, *d.DisplayName) + d.PreferredCollectorID = &collectorID } else { for _, option := range options { option(d) @@ -216,7 +212,7 @@ func (m *Manager) FindByDisplayNameAndClusterName(displayName string) (*models.D // Add implements types.DeviceManager. func (m *Manager) Add(options ...types.DeviceOption) (*models.Device, error) { - device := buildDevice(m.Config(), m.ControllerClient, nil, options...) + device := buildDevice(m.Config(), nil, options...) log.Debugf("%#v", device) params := lm.NewAddDeviceParams() @@ -236,25 +232,36 @@ func (m *Manager) Add(options ...types.DeviceOption) (*models.Device, error) { if err2 != nil { return nil, err2 } + m.DC.Set(*newDevice.DisplayName) return newDevice, nil } return nil, err } log.Debugf("%#v", restResponse) + m.DC.Set(*restResponse.Payload.DisplayName) return restResponse.Payload, nil } // UpdateAndReplace implements types.DeviceManager. func (m *Manager) UpdateAndReplace(d *models.Device, options ...types.DeviceOption) (*models.Device, error) { - device := buildDevice(m.Config(), m.ControllerClient, d, options...) + device := buildDevice(m.Config(), d, options...) log.Debugf("%#v", device) return m.updateAndReplace(d.ID, device) } // UpdateAndReplaceByDisplayName implements types.DeviceManager. -func (m *Manager) UpdateAndReplaceByDisplayName(name string, options ...types.DeviceOption) (*models.Device, error) { +func (m *Manager) UpdateAndReplaceByDisplayName(name string, filter types.UpdateFilter, options ...types.DeviceOption) (*models.Device, error) { + if !m.DC.Exists(name) { + log.Infof("Missing device %v; adding it now", name) + return m.Add(options...) + } + if filter != nil && !filter() { + log.Debugf("filtered device update %s", name) + return nil, nil + } + d, err := m.FindByDisplayNameAndClusterName(name) if err != nil { return nil, err @@ -264,13 +271,15 @@ func (m *Manager) UpdateAndReplaceByDisplayName(name string, options ...types.De log.Warnf("Could not find device %q", name) return nil, nil } + options = append(options, m.DisplayName(*d.DisplayName)) // Update the device. device, err := m.UpdateAndReplace(d, options...) if err != nil { + return nil, err } - + m.DC.Set(*device.DisplayName) return device, nil } @@ -278,7 +287,7 @@ func (m *Manager) UpdateAndReplaceByDisplayName(name string, options ...types.De // UpdateAndReplaceField implements types.DeviceManager. func (m *Manager) UpdateAndReplaceField(d *models.Device, field string, options ...types.DeviceOption) (*models.Device, error) { - device := buildDevice(m.Config(), m.ControllerClient, d, options...) + device := buildDevice(m.Config(), d, options...) log.Debugf("%#v", device) params := lm.NewPatchDeviceParams() @@ -342,6 +351,9 @@ func (m *Manager) DeleteByDisplayName(name string) error { params := lm.NewDeleteDeviceByIDParams() params.SetID(d.ID) _, err = m.LMClient.LM.DeleteDeviceByID(params) + if err == nil { + m.DC.Unset(name) + } return err } diff --git a/pkg/devicecache/devicecache.go b/pkg/devicecache/devicecache.go new file mode 100644 index 000000000..15b018096 --- /dev/null +++ b/pkg/devicecache/devicecache.go @@ -0,0 +1,147 @@ +package devicecache + +import ( + "sync" + "time" + + "github.com/logicmonitor/k8s-argus/pkg/constants" + "github.com/logicmonitor/k8s-argus/pkg/types" + "github.com/logicmonitor/lm-sdk-go/client/lm" + log "github.com/sirupsen/logrus" +) + +// DeviceCache to maintain a device cache to calcuate delta between device presence on server and on cluster +type DeviceCache struct { + store map[string]interface{} + rwm sync.RWMutex + resyncPeriod int + base *types.Base +} + +// NewDeviceCache create new DeviceCache object +func NewDeviceCache(b *types.Base, rp int) *DeviceCache { + dc := &DeviceCache{ + store: make(map[string]interface{}), + base: b, + resyncPeriod: rp, + } + return dc +} + +// Run start a goroutine to resync cache periodically with server +func (dc *DeviceCache) Run() { + go func(dcache *DeviceCache) { + for { + time.Sleep(time.Duration(dcache.resyncPeriod) * time.Minute) + log.Debugf("Device cache fetching devices") + devices := dcache.getAllDevices(dcache.base) + if devices == nil { + log.Errorf("Failed to fetch devices") + } else { + log.Debugf("Resync cache map") + dcache.resyncCache(devices) + log.Debugf("Resync cache done") + } + } + }(dc) +} + +func (dc *DeviceCache) getAllDevices(b *types.Base) map[string]interface{} { + cgrpid := b.Config.ClusterGroupID + + params := lm.NewGetDeviceGroupByIDParams() + params.SetID(cgrpid) + + g, err := b.LMClient.LM.GetDeviceGroupByID(params) + if err != nil { + log.Errorf("Error while fetching cluster device group %v", err) + return nil + } + + clusterGroupName := constants.ClusterDeviceGroupPrefix + b.Config.ClusterName + clusterGroupID := int32(0) + + for _, sg := range g.Payload.SubGroups { + if sg.Name == clusterGroupName { + clusterGroupID = sg.ID + break + } + } + + if clusterGroupID == 0 { + log.Errorf("No Cluster group found") + return nil + } + + grps := dc.getAllGroups(b, clusterGroupID) + grps = append(grps, clusterGroupID) + + log.Debugf("all groups: %#v", grps) + m := make(map[string]interface{}) + for _, gid := range grps { + params := lm.NewGetImmediateDeviceListByDeviceGroupIDParams() + params.SetID(gid) + resp, err := b.LMClient.LM.GetImmediateDeviceListByDeviceGroupID(params) + if err != nil { + continue + } + for _, device := range resp.Payload.Items { + m[*device.DisplayName] = true + } + } + return m +} + +func (dc *DeviceCache) resyncCache(m map[string]interface{}) { + dc.rwm.Lock() + defer dc.rwm.Unlock() + dc.store = m +} + +func (dc *DeviceCache) getAllGroups(b *types.Base, grpid int32) []int32 { + params := lm.NewGetDeviceGroupByIDParams() + params.SetID(grpid) + g, err := b.LMClient.LM.GetDeviceGroupByID(params) + if err != nil { + log.Errorf("Failed to fetch group with id: %v", grpid) + return []int32{} + } + subGroups := []int32{} + for _, sg := range g.Payload.SubGroups { + if sg.Name == "_deleted" { + continue + } + log.Debugf("Taking group: %v", sg.Name) + gps := dc.getAllGroups(b, sg.ID) + subGroups = append(subGroups, gps...) + } + subGroups = append(subGroups, grpid) + return subGroups +} + +// Set adds entry into cache map +func (dc *DeviceCache) Set(name string) bool { + log.Debugf("Setting cache entry %s", name) + dc.rwm.Lock() + defer dc.rwm.Unlock() + dc.store[name] = true + return true +} + +// Exists checks entry into cache map +func (dc *DeviceCache) Exists(name string) bool { + log.Debugf("Checking cache entry %s", name) + dc.rwm.RLock() + defer dc.rwm.RUnlock() + _, ok := dc.store[name] + return ok +} + +// Unset checks entry into cache map +func (dc *DeviceCache) Unset(name string) bool { + log.Debugf("Deleting cache entry %s", name) + dc.rwm.Lock() + defer dc.rwm.Unlock() + delete(dc.store, name) + return true +} diff --git a/pkg/log/globalhook.go b/pkg/log/globalhook.go new file mode 100644 index 000000000..ae5622d7e --- /dev/null +++ b/pkg/log/globalhook.go @@ -0,0 +1,22 @@ +package lmlog + +import ( + "os" + + log "github.com/sirupsen/logrus" +) + +// DefaultFieldHook global hook to add pod id in log stmt +type DefaultFieldHook struct { +} + +// Fire log hook method before dump +func (hook *DefaultFieldHook) Fire(entry *log.Entry) error { + entry.Data["pod_id"] = os.Getenv("MY_POD_NAME") + return nil +} + +// Levels level array to enable hook on +func (hook *DefaultFieldHook) Levels() []log.Level { + return log.AllLevels +} diff --git a/pkg/types/types.go b/pkg/types/types.go index 7a7cd2977..cb85a55e1 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -50,7 +50,7 @@ type DeviceMapper interface { // UpdateAndReplace updates a device using the 'replace' OpType. UpdateAndReplace(*models.Device, ...DeviceOption) (*models.Device, error) // UpdateAndReplaceByDisplayName updates a device using the 'replace' OpType if and onlt if it does not already exist. - UpdateAndReplaceByDisplayName(string, ...DeviceOption) (*models.Device, error) + UpdateAndReplaceByDisplayName(string, UpdateFilter, ...DeviceOption) (*models.Device, error) // UpdateAndReplaceField updates a device using the 'replace' OpType for a // specific field of a device. UpdateAndReplaceField(*models.Device, string, ...DeviceOption) (*models.Device, error) @@ -85,3 +85,6 @@ type DeviceBuilder interface { // System adds a custom property to the device. Custom(string, string) DeviceOption } + +// UpdateFilter is a boolean function to run predicate and return boolean value +type UpdateFilter func() bool diff --git a/pkg/utilities/cscutils.go b/pkg/utilities/cscutils.go new file mode 100644 index 000000000..8df3d8c76 --- /dev/null +++ b/pkg/utilities/cscutils.go @@ -0,0 +1,20 @@ +package utilities + +import ( + "context" + + "github.com/logicmonitor/k8s-argus/pkg/connection" + "github.com/logicmonitor/k8s-collectorset-controller/api" + log "github.com/sirupsen/logrus" +) + +// GetCollectorID - get collectorID from csc +func GetCollectorID() int32 { + reply, err := connection.GetCSCClient().CollectorID(context.Background(), &api.CollectorIDRequest{}) + if err != nil || reply == nil { + log.Errorf("Failed to get collector ID: %v", err) + return 0 + } + + return reply.Id +} diff --git a/pkg/watch/deployment/deployment.go b/pkg/watch/deployment/deployment.go index cb1671585..7f680be02 100644 --- a/pkg/watch/deployment/deployment.go +++ b/pkg/watch/deployment/deployment.go @@ -97,7 +97,7 @@ func (w *Watcher) add(deployment *appsv1.Deployment) { func (w *Watcher) update(old, new *appsv1.Deployment) { if _, err := w.UpdateAndReplaceByDisplayName( - fmtDeploymentDisplayName(old), + fmtDeploymentDisplayName(old), nil, w.args(new, constants.DeploymentCategory)..., ); err != nil { log.Errorf("Failed to update deployment %q: %v", fmtDeploymentDisplayName(new), err) @@ -106,6 +106,7 @@ func (w *Watcher) update(old, new *appsv1.Deployment) { log.Infof("Updated deployment %q", fmtDeploymentDisplayName(old)) } +// nolint: dupl func (w *Watcher) move(deployment *appsv1.Deployment) { if _, err := w.UpdateAndReplaceFieldByDisplayName(fmtDeploymentDisplayName(deployment), constants.CustomPropertiesFieldName, w.args(deployment, constants.DeploymentDeletedCategory)...); err != nil { log.Errorf("Failed to move deployment %q: %v", fmtDeploymentDisplayName(deployment), err) @@ -142,8 +143,8 @@ func GetDeploymentsMap(k8sClient *kubernetes.Clientset, namespace string) (map[s log.Warnf("Failed to get the deployments from k8s") return nil, err } - for _, deploymentInfo := range deploymentList.Items { - deploymentsMap[fmtDeploymentDisplayName(&deploymentInfo)] = deploymentInfo.Name + for i := range deploymentList.Items { + deploymentsMap[fmtDeploymentDisplayName(&deploymentList.Items[i])] = deploymentList.Items[i].Name } return deploymentsMap, nil diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index 0418741c0..817ae3cde 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -82,9 +82,9 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { } // Covers the case when the old node is in the process of terminating // and the new node is coming up to replace it. - if oldInternalAddress.Address != newInternalAddress.Address { - w.update(old, new) - } + // if oldInternalAddress.Address != newInternalAddress.Address { + w.update(old, new) + // } } } @@ -122,8 +122,14 @@ func (w *Watcher) add(node *v1.Node) { w.createRoleDeviceGroup(node.Labels) } +func (w *Watcher) nodeUpdateFilter(old, new *v1.Node) types.UpdateFilter { + return func() bool { + return getInternalAddress(old.Status.Addresses) != getInternalAddress(new.Status.Addresses) + } +} + func (w *Watcher) update(old, new *v1.Node) { - if _, err := w.UpdateAndReplaceByDisplayName(old.Name, w.args(new, constants.NodeCategory)...); err != nil { + if _, err := w.UpdateAndReplaceByDisplayName(old.Name, w.nodeUpdateFilter(old, new), w.args(new, constants.NodeCategory)...); err != nil { log.Errorf("Failed to update node %q: %v", new.Name, err) } else { log.Infof("Updated node %q", old.Name) @@ -163,7 +169,8 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { // getInternalAddress finds the node's internal address. func getInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { var hostname *v1.NodeAddress - for _, address := range addresses { + for i := range addresses { + address := addresses[i] if address.Type == v1.NodeInternalIP { return &address } diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index 30f6ab95c..f111b850d 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -82,9 +82,9 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { return } - if old.Status.PodIP != new.Status.PodIP { - w.update(old, new) - } + // if old.Status.PodIP != new.Status.PodIP { + w.update(old, new) + // } } } @@ -122,9 +122,15 @@ func (w *Watcher) add(pod *v1.Pod) { log.Infof("Added pod %q", pod.Name) } +func (w *Watcher) podUpdateFilter(old, new *v1.Pod) types.UpdateFilter { + return func() bool { + return old.Status.PodIP != new.Status.PodIP + } +} + func (w *Watcher) update(old, new *v1.Pod) { if _, err := w.UpdateAndReplaceByDisplayName( - old.Name, + old.Name, w.podUpdateFilter(old, new), w.args(new, constants.PodCategory)..., ); err != nil { log.Errorf("Failed to update pod %q: %v", new.Name, err) @@ -178,9 +184,9 @@ func GetPodsMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]s if err != nil || podList == nil { return nil, err } - for _, podInfo := range podList.Items { + for i := range podList.Items { // TODO: we should improve the value of the map to the ip of the pod when changing the name of the device to the ip - podsMap[podInfo.Name] = getPodDNSName(&podInfo) + podsMap[podList.Items[i].Name] = getPodDNSName(&podList.Items[i]) } return podsMap, nil diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index 5bdbbfcfb..39dce6217 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -75,9 +75,9 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { // Covers the case when the old service is in the process of terminating // and the new service is coming up to replace it. - if old.Spec.ClusterIP != new.Spec.ClusterIP { - w.update(old, new) - } + // if old.Spec.ClusterIP != new.Spec.ClusterIP { + w.update(old, new) + // } } } @@ -111,9 +111,15 @@ func (w *Watcher) add(service *v1.Service) { log.Infof("Added service %q", fmtServiceDisplayName(service)) } +func (w *Watcher) serviceUpdateFilter(old, new *v1.Service) types.UpdateFilter { + return func() bool { + return old.Spec.ClusterIP != new.Spec.ClusterIP + } +} + func (w *Watcher) update(old, new *v1.Service) { if _, err := w.UpdateAndReplaceByDisplayName( - fmtServiceDisplayName(old), + fmtServiceDisplayName(old), w.serviceUpdateFilter(old, new), w.args(new, constants.ServiceCategory)..., ); err != nil { log.Errorf("Failed to update service %q: %v", fmtServiceDisplayName(new), err) @@ -122,6 +128,7 @@ func (w *Watcher) update(old, new *v1.Service) { log.Infof("Updated service %q", fmtServiceDisplayName(old)) } +// nolint: dupl func (w *Watcher) move(service *v1.Service) { if _, err := w.UpdateAndReplaceFieldByDisplayName(fmtServiceDisplayName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { log.Errorf("Failed to move service %q: %v", fmtServiceDisplayName(service), err) @@ -158,8 +165,8 @@ func GetServicesMap(k8sClient *kubernetes.Clientset, namespace string) (map[stri log.Warnf("Failed to get the services from k8s") return nil, err } - for _, serviceInfo := range serviceList.Items { - servicesMap[fmtServiceDisplayName(&serviceInfo)] = serviceInfo.Spec.ClusterIP + for i := range serviceList.Items { + servicesMap[fmtServiceDisplayName(&serviceList.Items[i])] = serviceList.Items[i].Spec.ClusterIP } return servicesMap, nil