From bb2c1f58e698be79c3098c103052bc3efa89d1bf Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 12 Oct 2018 10:02:53 +0800 Subject: [PATCH 01/22] DEV-39965: Allow the user to specify a parent group during installation --- docs/docs/configuration/index.html | 3 ++- docs/getting-started/index.html | 1 + docs/source/content/docs/configuration.md | 2 ++ docs/source/content/getting-started/index.md | 1 + pkg/config/config.go | 6 +++--- pkg/devicegroup/devicegroup.go | 16 ++++++++++++++++ pkg/tree/tree.go | 20 +++++++++++++++++++- 7 files changed, 44 insertions(+), 5 deletions(-) diff --git a/docs/docs/configuration/index.html b/docs/docs/configuration/index.html index e0f842ee2..714c62927 100644 --- a/docs/docs/configuration/index.html +++ b/docs/docs/configuration/index.html @@ -399,7 +399,8 @@

Configuring Argus Manually

To configure the non-sensitive information, create a YAML file located at /etc/argus/config.yaml. Here is an example file you can modify to your needs:

-
cluster_name:
+
cluster_group_id:
+cluster_name:
 debug: false
 delete_devices: true
 disable_alerting: false
diff --git a/docs/getting-started/index.html b/docs/getting-started/index.html
index 32ec9a612..e506fb733 100644
--- a/docs/getting-started/index.html
+++ b/docs/getting-started/index.html
@@ -385,6 +385,7 @@ 

Getting Started

--set accessID="$ACCESS_ID" \ --set accessKey="$ACCESS_KEY" \ --set account="$ACCOUNT" \ + --set clusterGroupID="$CLUSTER_GROUP_ID" \ --set clusterName="$CLUSTER_NAME" \ --set etcdDiscoveryToken="$ETCD_DISCOVERY_TOKEN" \ --set imageTag="$IMAGE_TAG" \ diff --git a/docs/source/content/docs/configuration.md b/docs/source/content/docs/configuration.md index 0cf409c3f..dff323425 100644 --- a/docs/source/content/docs/configuration.md +++ b/docs/source/content/docs/configuration.md @@ -37,6 +37,7 @@ Required Values: - **accessID:** The LogicMonitor API key ID. - **accessKey:** The LogicMonitor API key. - **account:** The LogicMonitor account name. +- **clusterGroupID:** A parent group id of the cluster's device group. - **clusterName:** A unique name given to the cluster's device group. - **collector.replicas:** The number of collectors to create and use with Argus. - **collector.size:** The collector size to install. Can be nano, small, medium, or large. @@ -61,6 +62,7 @@ In most applications there are generally two types of configuration options avai To configure the non-sensitive information, create a YAML file located at `/etc/argus/config.yaml`. Here is an example file you can modify to your needs: ```yaml +cluster_group_id: cluster_name: debug: false delete_devices: true diff --git a/docs/source/content/getting-started/index.md b/docs/source/content/getting-started/index.md index 758e59a11..cff66878d 100644 --- a/docs/source/content/getting-started/index.md +++ b/docs/source/content/getting-started/index.md @@ -48,6 +48,7 @@ Next, install Argus: --set accessID="$ACCESS_ID" \ --set accessKey="$ACCESS_KEY" \ --set account="$ACCOUNT" \ + --set clusterGroupID="$CLUSTER_GROUP_ID" \ --set clusterName="$CLUSTER_NAME" \ --set etcdDiscoveryToken="$ETCD_DISCOVERY_TOKEN" \ --set imageTag="$IMAGE_TAG" \ diff --git a/pkg/config/config.go b/pkg/config/config.go index 4324d6a8f..3fe931ccb 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,11 +1,10 @@ package config import ( - "io/ioutil" - "github.com/kelseyhightower/envconfig" "github.com/logicmonitor/k8s-argus/pkg/constants" - yaml "gopkg.in/yaml.v2" + "gopkg.in/yaml.v2" + "io/ioutil" ) // Config represents the application's configuration file. @@ -13,6 +12,7 @@ type Config struct { *Secrets Address string `yaml:"address"` ClusterCategory string `yaml:"cluster_category"` + ClusterGroupID int32 `yaml:"cluster_group_id"` ClusterName string `yaml:"cluster_name"` Debug bool `yaml:"debug"` DeleteDevices bool `yaml:"delete_devices"` diff --git a/pkg/devicegroup/devicegroup.go b/pkg/devicegroup/devicegroup.go index c42f4bb13..f1fae7038 100644 --- a/pkg/devicegroup/devicegroup.go +++ b/pkg/devicegroup/devicegroup.go @@ -141,6 +141,22 @@ func Exists(parentID int32, name string, client *lm.DefaultApi) bool { return false } +// ExistsByID returns true if we could get the group by id +func ExistsByID(groupID int32, client *lm.DefaultApi) bool { + restResponse, apiResponse, err := client.GetDeviceGroupById(groupID, "name,id") + if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { + log.Warnf("Failed to get device group (id=%v): %v", groupID, _err) + } + + log.Debugf("%#v", restResponse) + + if &restResponse.Data != nil && restResponse.Data.Id == groupID { + return true + } + + return false +} + // DeleteSubGroup deletes a subgroup from a device group with the specified // name. func DeleteSubGroup(deviceGroup *lm.RestDeviceGroup, name string, client *lm.DefaultApi) error { diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go index e27f05181..bf07cc341 100644 --- a/pkg/tree/tree.go +++ b/pkg/tree/tree.go @@ -4,6 +4,7 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/devicegroup" "github.com/logicmonitor/k8s-argus/pkg/types" + log "github.com/sirupsen/logrus" ) // DeviceTree manages the device tree representation of a Kubernetes cluster in LogicMonitor. @@ -17,7 +18,7 @@ func (d *DeviceTree) buildOptsSlice() []*devicegroup.Options { return []*devicegroup.Options{ { Name: constants.ClusterDeviceGroupPrefix + d.Config.ClusterName, - ParentID: constants.RootDeviceGroupID, + ParentID: d.Config.ClusterGroupID, DisableAlerting: d.Config.DisableAlerting, AppliesTo: devicegroup.NewAppliesToBuilder().HasCategory(constants.ClusterCategory).And().Auto("clustername").Equals(d.Config.ClusterName), Client: d.LMClient, @@ -70,6 +71,9 @@ func (d *DeviceTree) buildOptsSlice() []*devicegroup.Options { // CreateDeviceTree creates the Device tree that will represent the cluster in LogicMonitor. func (d *DeviceTree) CreateDeviceTree() (map[string]int32, error) { + // create the parent cluster group first + d.checkAndUpdateClusterGroup() + deviceGroups := make(map[string]int32) for _, opts := range d.buildOptsSlice() { switch opts.Name { @@ -91,3 +95,17 @@ func (d *DeviceTree) CreateDeviceTree() (map[string]int32, error) { return deviceGroups, nil } + +func (d *DeviceTree) checkAndUpdateClusterGroup() { + // do not need to check the root group + if d.Config.ClusterGroupID == constants.RootDeviceGroupID { + return + } + + // if the group does not exist anymore, we will add the cluster to the root group + if !devicegroup.ExistsByID(d.Config.ClusterGroupID, d.LMClient) { + log.Warnf("The device group (id=%v) does not exist, the cluster will be added to the root group", d.Config.ClusterGroupID) + d.Config.ClusterGroupID = constants.RootDeviceGroupID + } + return +} From 2d53e262cb2536224e12a33a68e1ec18544018bf Mon Sep 17 00:00:00 2001 From: jeremy Date: Tue, 16 Oct 2018 15:28:41 +0800 Subject: [PATCH 02/22] DEV-39965: Allow the user to specify a parent group during installation --- pkg/config/config.go | 5 +++-- pkg/tree/tree.go | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 3fe931ccb..c64aa542a 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,10 +1,11 @@ package config import ( + "io/ioutil" + "github.com/kelseyhightower/envconfig" "github.com/logicmonitor/k8s-argus/pkg/constants" "gopkg.in/yaml.v2" - "io/ioutil" ) // Config represents the application's configuration file. @@ -12,11 +13,11 @@ type Config struct { *Secrets Address string `yaml:"address"` ClusterCategory string `yaml:"cluster_category"` - ClusterGroupID int32 `yaml:"cluster_group_id"` ClusterName string `yaml:"cluster_name"` Debug bool `yaml:"debug"` DeleteDevices bool `yaml:"delete_devices"` DisableAlerting bool `yaml:"disable_alerting"` + ClusterGroupID int32 `yaml:"cluster_group_id"` } // Secrets represents the application's sensitive configuration file. diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go index bf07cc341..13144192c 100644 --- a/pkg/tree/tree.go +++ b/pkg/tree/tree.go @@ -107,5 +107,4 @@ func (d *DeviceTree) checkAndUpdateClusterGroup() { log.Warnf("The device group (id=%v) does not exist, the cluster will be added to the root group", d.Config.ClusterGroupID) d.Config.ClusterGroupID = constants.RootDeviceGroupID } - return } From 5dfab5be0c7a04cc11b107cf41d2948f878abf62 Mon Sep 17 00:00:00 2001 From: jeremy Date: Thu, 1 Nov 2018 15:02:29 +0800 Subject: [PATCH 03/22] DEV-40505: Sync the k8s resource to santaba when argus launches --- pkg/argus.go | 27 ++++++ pkg/device/device.go | 10 ++ pkg/devicegroup/devicegroup.go | 3 +- pkg/sync/initsyncer.go | 172 +++++++++++++++++++++++++++++++++ pkg/tree/tree.go | 16 --- pkg/watch/node/node.go | 14 +-- pkg/watch/service/service.go | 26 ++--- 7 files changed, 231 insertions(+), 37 deletions(-) create mode 100644 pkg/sync/initsyncer.go diff --git a/pkg/argus.go b/pkg/argus.go index caa4785f0..c6205129b 100644 --- a/pkg/argus.go +++ b/pkg/argus.go @@ -6,7 +6,9 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/config" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/device" + "github.com/logicmonitor/k8s-argus/pkg/devicegroup" "github.com/logicmonitor/k8s-argus/pkg/etcd" + "github.com/logicmonitor/k8s-argus/pkg/sync" "github.com/logicmonitor/k8s-argus/pkg/tree" "github.com/logicmonitor/k8s-argus/pkg/types" "github.com/logicmonitor/k8s-argus/pkg/watch/namespace" @@ -15,6 +17,7 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/watch/service" "github.com/logicmonitor/k8s-collectorset-controller/api" lm "github.com/logicmonitor/lm-sdk-go" + log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" @@ -75,6 +78,13 @@ func NewArgus(base *types.Base, client api.CollectorSetControllerClient) (*Argus deviceTree := &tree.DeviceTree{ Base: base, } + + // init sync with santaba to delete the non-exist resource devices + initSyncer := sync.InitSyncer{ + DeviceManager: deviceManager, + } + initSyncer.InitSync() + deviceGroups, err := deviceTree.CreateDeviceTree() if err != nil { return nil, err @@ -116,6 +126,9 @@ func NewBase(config *config.Config) (*types.Base, error) { // LogicMonitor API client. lmClient := newLMClient(config.ID, config.Key, config.Account) + // check and update the params + checkAndUpdateClusterGroup(config, lmClient) + // Kubernetes API client. k8sClient, err := newK8sClient() if err != nil { @@ -150,3 +163,17 @@ func (a *Argus) Watch() { go controller.Run(stop) } } + +// check the cluster group ID, if the group does not exist, just use the root group +func checkAndUpdateClusterGroup(config *config.Config, lmClient *lm.DefaultApi) { + // do not need to check the root group + if config.ClusterGroupID == constants.RootDeviceGroupID { + return + } + + // if the group does not exist anymore, we will add the cluster to the root group + if !devicegroup.ExistsByID(config.ClusterGroupID, lmClient) { + log.Warnf("The device group (id=%v) does not exist, the cluster will be added to the root group", config.ClusterGroupID) + config.ClusterGroupID = constants.RootDeviceGroupID + } +} diff --git a/pkg/device/device.go b/pkg/device/device.go index f5e4570c6..61c8caf24 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -183,3 +183,13 @@ func find(field, name string, client *lm.DefaultApi) (*lm.RestDevice, error) { return nil, nil } + +// GetListByGroupID implements getting all the devices belongs to the group directly +func (m *Manager) GetListByGroupID(groupID int32) ([]lm.RestDevice, error) { + restResponse, apiResponse, err := m.LMClient.GetImmediateDeviceListByDeviceGroupId(groupID, "id,name,displayName", -1, 0, "") + if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { + return nil, _err + } + log.Debugf("%#v", restResponse) + return restResponse.Data.Items, nil +} diff --git a/pkg/devicegroup/devicegroup.go b/pkg/devicegroup/devicegroup.go index f1fae7038..110074d75 100644 --- a/pkg/devicegroup/devicegroup.go +++ b/pkg/devicegroup/devicegroup.go @@ -5,7 +5,6 @@ import ( "net/url" "github.com/logicmonitor/k8s-argus/pkg/constants" - "github.com/logicmonitor/k8s-argus/pkg/utilities" lm "github.com/logicmonitor/lm-sdk-go" log "github.com/sirupsen/logrus" @@ -109,7 +108,7 @@ func Create(opts *Options) (int32, error) { // Find searches for a device group identified by the parent ID and name. func Find(parentID int32, name string, client *lm.DefaultApi) (*lm.RestDeviceGroup, error) { filter := fmt.Sprintf("name:%s", url.QueryEscape(name)) - restResponse, apiResponse, err := client.GetDeviceGroupList("name,id,parentId", -1, 0, filter) + restResponse, apiResponse, err := client.GetDeviceGroupList("name,id,parentId,subGroups", -1, 0, filter) if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { return nil, fmt.Errorf("Failed to get device group list when searching for %q: %v", name, _err) } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go new file mode 100644 index 000000000..950ca420f --- /dev/null +++ b/pkg/sync/initsyncer.go @@ -0,0 +1,172 @@ +package sync + +import ( + "github.com/logicmonitor/k8s-argus/pkg/constants" + "github.com/logicmonitor/k8s-argus/pkg/device" + "github.com/logicmonitor/k8s-argus/pkg/devicegroup" + "github.com/logicmonitor/k8s-argus/pkg/watch/node" + "github.com/logicmonitor/k8s-argus/pkg/watch/service" + "github.com/logicmonitor/lm-sdk-go" + log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// InitSyncer implements the initial sync with Santaba +type InitSyncer struct { + DeviceManager *device.Manager +} + +// InitSync implements the initial sync with Santaba +func (i *InitSyncer) InitSync() { + log.Infof("Start to sync the resource devices") + clusterName := i.DeviceManager.Base.Config.ClusterName + // get the cluster info + parentGroupID := i.DeviceManager.Config().ClusterGroupID + groupName := constants.ClusterDeviceGroupPrefix + clusterName + rest, err := devicegroup.Find(parentGroupID, groupName, i.DeviceManager.LMClient) + if err != nil || rest == nil { + log.Infof("Failed to get the cluster group: %v, parentID: %v", groupName, parentGroupID) + return + } + + // get the node, pod, service info + if rest.SubGroups != nil { + c := make(chan string, 3) + syncNum := 0 + for _, subgroup := range rest.SubGroups { + switch subgroup.Name { + case constants.NodeDeviceGroupName: + go i.intSyncNodes(rest.Id, c) + syncNum++ + case constants.PodDeviceGroupName: + go i.initSyncPods(rest.Id, c) + syncNum++ + case constants.ServiceDeviceGroupName: + go i.initSyncServices(rest.Id, c) + syncNum++ + default: + log.Infof("Unsupported group to sync, ignore it: %v", subgroup.Name) + } + } + + for i := 0; i < syncNum; i++ { + log.Infof("Finish syncing %v", <-c) + } + } +} + +func (i *InitSyncer) intSyncNodes(parentGroupID int32, c chan string) { + defer i.sendInfoToChan(constants.NodeDeviceGroupName, c) + + rest, err := devicegroup.Find(parentGroupID, constants.NodeDeviceGroupName, i.DeviceManager.LMClient) + if err != nil || rest == nil { + log.Warnf("Failed to get the node group") + return + } + if rest.SubGroups == nil { + return + } + + //get node info from k8s + nodesMap := make(map[string]string) + nodeList, err := i.DeviceManager.K8sClient.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil || nodeList == nil { + log.Warnf("Failed to get the nodes from k8s") + return + } + for _, nodeInfo := range nodeList.Items { + nodesMap[nodeInfo.Name] = node.GetInternalAddress(nodeInfo.Status.Addresses).Address + } + + for _, subGroup := range rest.SubGroups { + // all the node device will be added to the group "ALL", so we only need to check it + if subGroup.Name != constants.AllNodeDeviceGroupName { + continue + } + i.syncDevices(constants.NodeDeviceGroupName, nodesMap, subGroup) + } + +} + +func (i *InitSyncer) initSyncPods(parentGroupID int32, c chan string) { + defer i.sendInfoToChan(constants.PodDeviceGroupName, c) + + rest, err := devicegroup.Find(parentGroupID, constants.PodDeviceGroupName, i.DeviceManager.LMClient) + if err != nil || rest == nil { + log.Warnf("Failed to get the pod group") + return + } + if rest.SubGroups == nil { + return + } + + // loop every namesplace + for _, subGroup := range rest.SubGroups { + //get pod info from k8s + podsMap := make(map[string]string) + podList, err := i.DeviceManager.K8sClient.CoreV1().Pods(subGroup.Name).List(metav1.ListOptions{}) + if err != nil || podList == nil { + log.Warnf("Failed to get the pods from k8s") + return + } + for _, podInfo := range podList.Items { + // TODO: we should improve the value of the map to the ip of the pod when changing the name of the device to the ip + podsMap[podInfo.Name] = podInfo.Name + } + + // get and check all the devices in the group + i.syncDevices(constants.PodDeviceGroupName, podsMap, subGroup) + } +} + +func (i *InitSyncer) initSyncServices(parentGroupID int32, c chan string) { + defer i.sendInfoToChan(constants.ServiceDeviceGroupName, c) + + rest, err := devicegroup.Find(parentGroupID, constants.ServiceDeviceGroupName, i.DeviceManager.LMClient) + if err != nil || rest == nil { + log.Warnf("Failed to get the pod group") + return + } + if rest.SubGroups == nil { + return + } + + // loop every namesplace + for _, subGroup := range rest.SubGroups { + //get service info from k8s + servicesMap := make(map[string]string) + serviceList, err := i.DeviceManager.K8sClient.CoreV1().Services(subGroup.Name).List(metav1.ListOptions{}) + if err != nil || serviceList == nil { + log.Warnf("Failed to get the services from k8s") + return + } + for _, serviceInfo := range serviceList.Items { + servicesMap[service.FmtServiceDisplayName(&serviceInfo)] = service.FmtServiceName(&serviceInfo) + } + + // get and check all the devices in the group + i.syncDevices(constants.ServiceDeviceGroupName, servicesMap, subGroup) + } +} + +func (i *InitSyncer) sendInfoToChan(info string, c chan string) { + c <- info +} + +func (i *InitSyncer) syncDevices(resourceType string, resourcesMap map[string]string, subGroup logicmonitor.GroupData) { + devices, err := i.DeviceManager.GetListByGroupID(subGroup.Id) + if err != nil || devices == nil { + log.Warnf("Failed to get the devices in the group: %v", subGroup.FullPath) + return + } + for _, device := range devices { + name, exist := resourcesMap[device.DisplayName] + if !exist || name != device.Name { + log.Infof("Delete the non-exist %v device: %v", resourceType, device.DisplayName) + err := i.DeviceManager.DeleteByID(device.Id) + if err != nil { + log.Warnf("Failed to delete the device: %v", device.DisplayName) + } + } + } +} diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go index 13144192c..d4c61a6e7 100644 --- a/pkg/tree/tree.go +++ b/pkg/tree/tree.go @@ -4,7 +4,6 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/devicegroup" "github.com/logicmonitor/k8s-argus/pkg/types" - log "github.com/sirupsen/logrus" ) // DeviceTree manages the device tree representation of a Kubernetes cluster in LogicMonitor. @@ -71,8 +70,6 @@ func (d *DeviceTree) buildOptsSlice() []*devicegroup.Options { // CreateDeviceTree creates the Device tree that will represent the cluster in LogicMonitor. func (d *DeviceTree) CreateDeviceTree() (map[string]int32, error) { - // create the parent cluster group first - d.checkAndUpdateClusterGroup() deviceGroups := make(map[string]int32) for _, opts := range d.buildOptsSlice() { @@ -95,16 +92,3 @@ func (d *DeviceTree) CreateDeviceTree() (map[string]int32, error) { return deviceGroups, nil } - -func (d *DeviceTree) checkAndUpdateClusterGroup() { - // do not need to check the root group - if d.Config.ClusterGroupID == constants.RootDeviceGroupID { - return - } - - // if the group does not exist anymore, we will add the cluster to the root group - if !devicegroup.ExistsByID(d.Config.ClusterGroupID, d.LMClient) { - log.Warnf("The device group (id=%v) does not exist, the cluster will be added to the root group", d.Config.ClusterGroupID) - d.Config.ClusterGroupID = constants.RootDeviceGroupID - } -} diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index fde5a0455..80a024244 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -44,7 +44,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { log.Debugf("received ADD event: %s", node.Name) // Require an IP address. - if getInternalAddress(node.Status.Addresses) == nil { + if GetInternalAddress(node.Status.Addresses) == nil { return } w.add(node) @@ -61,8 +61,8 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { // If the old node does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new device. - oldInternalAddress := getInternalAddress(old.Status.Addresses) - newInternalAddress := getInternalAddress(new.Status.Addresses) + oldInternalAddress := GetInternalAddress(old.Status.Addresses) + newInternalAddress := GetInternalAddress(new.Status.Addresses) if oldInternalAddress == nil && newInternalAddress != nil { w.add(new) return @@ -84,7 +84,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { log.Debugf("received DELETE event: %s", node.Name) // Delete the node. - internalAddress := getInternalAddress(node.Status.Addresses).Address + internalAddress := GetInternalAddress(node.Status.Addresses).Address if w.Config().DeleteDevices { if err := w.DeleteByName(internalAddress); err != nil { log.Errorf("Failed to delete node: %v", err) @@ -138,7 +138,7 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, node.Labels) return []types.DeviceOption{ - w.Name(getInternalAddress(node.Status.Addresses).Address), + w.Name(GetInternalAddress(node.Status.Addresses).Address), w.ResourceLabels(node.Labels), w.DisplayName(node.Name), w.SystemCategories(categories), @@ -148,8 +148,8 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { } } -// getInternalAddress finds the node's internal address. -func getInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { +// GetInternalAddress finds the node's internal address. +func GetInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { for _, address := range addresses { if address.Type == v1.NodeInternalIP { return &address diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index 6a8c23c83..c140cea72 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -79,7 +79,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { // Delete the service. if w.Config().DeleteDevices { - if err := w.DeleteByName(fmtServiceName(service)); err != nil { + if err := w.DeleteByName(FmtServiceName(service)); err != nil { log.Errorf("Failed to delete service: %v", err) return } @@ -100,34 +100,34 @@ func (w *Watcher) add(service *v1.Service) { log.Errorf("Failed to add service %q: %v", service.Name, err) return } - log.Infof("Added service %q", fmtServiceName(service)) + log.Infof("Added service %q", FmtServiceName(service)) } func (w *Watcher) update(old, new *v1.Service) { if _, err := w.UpdateAndReplaceByName( - fmtServiceName(old), + FmtServiceName(old), w.args(new, constants.ServiceCategory)..., ); err != nil { - log.Errorf("Failed to update service %q: %v", fmtServiceName(new), err) + log.Errorf("Failed to update service %q: %v", FmtServiceName(new), err) return } log.Infof("Updated service %q", old.Name) } func (w *Watcher) move(service *v1.Service) { - if _, err := w.UpdateAndReplaceFieldByName(fmtServiceName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { - log.Errorf("Failed to move service %q: %v", fmtServiceName(service), err) + if _, err := w.UpdateAndReplaceFieldByName(FmtServiceName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { + log.Errorf("Failed to move service %q: %v", FmtServiceName(service), err) return } - log.Infof("Moved service %q", fmtServiceName(service)) + log.Infof("Moved service %q", FmtServiceName(service)) } func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, service.Labels) return []types.DeviceOption{ - w.Name(fmtServiceName(service)), + w.Name(FmtServiceName(service)), w.ResourceLabels(service.Labels), - w.DisplayName(fmtServiceDisplayName(service)), + w.DisplayName(FmtServiceDisplayName(service)), w.SystemCategories(categories), w.Auto("name", service.Name), w.Auto("namespace", service.Namespace), @@ -136,10 +136,12 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio } } -func fmtServiceName(service *v1.Service) string { +// FmtServiceName implements the conversion for the service name +func FmtServiceName(service *v1.Service) string { return service.Name + "." + service.Namespace + ".svc" } -func fmtServiceDisplayName(service *v1.Service) string { - return fmtServiceName(service) + "-" + string(service.UID) +// FmtServiceDisplayName implements the conversion for the service display name +func FmtServiceDisplayName(service *v1.Service) string { + return FmtServiceName(service) + "-" + string(service.UID) } From 4058f07bcd673d624b1e5e126efea081b86444f0 Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 2 Nov 2018 10:11:11 +0800 Subject: [PATCH 04/22] DEV-40434: The alert status of Pods and Services are disabled --- pkg/watch/node/node.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index fde5a0455..c69378c87 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -174,7 +174,7 @@ func (w *Watcher) createRoleDeviceGroup(labels map[string]string) { opts := &devicegroup.Options{ ParentID: w.DeviceGroups[constants.NodeDeviceGroupName], Name: role, - DisableAlerting: true, + DisableAlerting: w.Config().DisableAlerting, AppliesTo: devicegroup.NewAppliesToBuilder().HasCategory(label + "=").And().Auto("clustername").Equals(w.Config().ClusterName), Client: w.LMClient, DeleteDevices: w.Config().DeleteDevices, From e43a6a75c2da9437f9bec1271f8cbf498b560ab7 Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 2 Nov 2018 18:05:26 +0800 Subject: [PATCH 05/22] DEV-40505: Sync the k8s resource to santaba when argus launches --- pkg/argus.go | 2 +- pkg/sync/initsyncer.go | 101 ++++++++++++++++------------------- pkg/watch/node/node.go | 30 ++++++++--- pkg/watch/pod/pod.go | 17 ++++++ pkg/watch/service/service.go | 39 ++++++++++---- 5 files changed, 115 insertions(+), 74 deletions(-) diff --git a/pkg/argus.go b/pkg/argus.go index c6205129b..7e468d7f8 100644 --- a/pkg/argus.go +++ b/pkg/argus.go @@ -79,7 +79,7 @@ func NewArgus(base *types.Base, client api.CollectorSetControllerClient) (*Argus Base: base, } - // init sync with santaba to delete the non-exist resource devices + // init sync to delete the non-exist resource devices through logicmonitor API initSyncer := sync.InitSyncer{ DeviceManager: deviceManager, } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index 950ca420f..d12ba0eb6 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -1,22 +1,24 @@ package sync import ( + "sync" + "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/device" "github.com/logicmonitor/k8s-argus/pkg/devicegroup" "github.com/logicmonitor/k8s-argus/pkg/watch/node" + "github.com/logicmonitor/k8s-argus/pkg/watch/pod" "github.com/logicmonitor/k8s-argus/pkg/watch/service" "github.com/logicmonitor/lm-sdk-go" log "github.com/sirupsen/logrus" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// InitSyncer implements the initial sync with Santaba +// InitSyncer implements the initial sync through logicmonitor API type InitSyncer struct { DeviceManager *device.Manager } -// InitSync implements the initial sync with Santaba +// InitSync implements the initial sync through logicmonitor API func (i *InitSyncer) InitSync() { log.Infof("Start to sync the resource devices") clusterName := i.DeviceManager.Base.Config.ClusterName @@ -30,34 +32,45 @@ func (i *InitSyncer) InitSync() { } // get the node, pod, service info - if rest.SubGroups != nil { - c := make(chan string, 3) - syncNum := 0 + if rest.SubGroups != nil && len(rest.SubGroups) != 0 { + wg := sync.WaitGroup{} + wg.Add(len(rest.SubGroups)) for _, subgroup := range rest.SubGroups { switch subgroup.Name { case constants.NodeDeviceGroupName: - go i.intSyncNodes(rest.Id, c) - syncNum++ + go func() { + defer wg.Done() + i.intSyncNodes(rest.Id) + log.Infof("Finish syncing %v", constants.NodeDeviceGroupName) + }() case constants.PodDeviceGroupName: - go i.initSyncPods(rest.Id, c) - syncNum++ + go func() { + defer wg.Done() + i.initSyncPods(rest.Id) + log.Infof("Finish syncing %v", constants.PodDeviceGroupName) + }() case constants.ServiceDeviceGroupName: - go i.initSyncServices(rest.Id, c) - syncNum++ + go func() { + defer wg.Done() + i.initSyncServices(rest.Id) + log.Infof("Finish syncing %v", constants.ServiceDeviceGroupName) + }() default: - log.Infof("Unsupported group to sync, ignore it: %v", subgroup.Name) + func() { + defer wg.Done() + log.Infof("Unsupported group to sync, ignore it: %v", subgroup.Name) + }() + } } - for i := 0; i < syncNum; i++ { - log.Infof("Finish syncing %v", <-c) - } + // wait the init sync processes finishing + wg.Wait() + log.Infof("Finish syncing the resource devices") } } -func (i *InitSyncer) intSyncNodes(parentGroupID int32, c chan string) { - defer i.sendInfoToChan(constants.NodeDeviceGroupName, c) - +func (i *InitSyncer) intSyncNodes(parentGroupID int32) { rest, err := devicegroup.Find(parentGroupID, constants.NodeDeviceGroupName, i.DeviceManager.LMClient) if err != nil || rest == nil { log.Warnf("Failed to get the node group") @@ -68,15 +81,11 @@ func (i *InitSyncer) intSyncNodes(parentGroupID int32, c chan string) { } //get node info from k8s - nodesMap := make(map[string]string) - nodeList, err := i.DeviceManager.K8sClient.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil || nodeList == nil { - log.Warnf("Failed to get the nodes from k8s") + nodesMap, err := node.GetNodesMap(i.DeviceManager.K8sClient) + if err != nil || nodesMap == nil { + log.Warnf("Failed to get the nodes from k8s, err: %v", err) return } - for _, nodeInfo := range nodeList.Items { - nodesMap[nodeInfo.Name] = node.GetInternalAddress(nodeInfo.Status.Addresses).Address - } for _, subGroup := range rest.SubGroups { // all the node device will be added to the group "ALL", so we only need to check it @@ -85,12 +94,9 @@ func (i *InitSyncer) intSyncNodes(parentGroupID int32, c chan string) { } i.syncDevices(constants.NodeDeviceGroupName, nodesMap, subGroup) } - } -func (i *InitSyncer) initSyncPods(parentGroupID int32, c chan string) { - defer i.sendInfoToChan(constants.PodDeviceGroupName, c) - +func (i *InitSyncer) initSyncPods(parentGroupID int32) { rest, err := devicegroup.Find(parentGroupID, constants.PodDeviceGroupName, i.DeviceManager.LMClient) if err != nil || rest == nil { log.Warnf("Failed to get the pod group") @@ -100,18 +106,13 @@ func (i *InitSyncer) initSyncPods(parentGroupID int32, c chan string) { return } - // loop every namesplace + // loop every namespace for _, subGroup := range rest.SubGroups { //get pod info from k8s - podsMap := make(map[string]string) - podList, err := i.DeviceManager.K8sClient.CoreV1().Pods(subGroup.Name).List(metav1.ListOptions{}) - if err != nil || podList == nil { - log.Warnf("Failed to get the pods from k8s") - return - } - for _, podInfo := range podList.Items { - // TODO: we should improve the value of the map to the ip of the pod when changing the name of the device to the ip - podsMap[podInfo.Name] = podInfo.Name + podsMap, err := pod.GetPodsMap(i.DeviceManager.K8sClient, subGroup.Name) + if err != nil || podsMap == nil { + log.Warnf("Failed to get the pods from k8s, namespace: %v, err: %v", subGroup.Name, err) + continue } // get and check all the devices in the group @@ -119,9 +120,7 @@ func (i *InitSyncer) initSyncPods(parentGroupID int32, c chan string) { } } -func (i *InitSyncer) initSyncServices(parentGroupID int32, c chan string) { - defer i.sendInfoToChan(constants.ServiceDeviceGroupName, c) - +func (i *InitSyncer) initSyncServices(parentGroupID int32) { rest, err := devicegroup.Find(parentGroupID, constants.ServiceDeviceGroupName, i.DeviceManager.LMClient) if err != nil || rest == nil { log.Warnf("Failed to get the pod group") @@ -134,14 +133,10 @@ func (i *InitSyncer) initSyncServices(parentGroupID int32, c chan string) { // loop every namesplace for _, subGroup := range rest.SubGroups { //get service info from k8s - servicesMap := make(map[string]string) - serviceList, err := i.DeviceManager.K8sClient.CoreV1().Services(subGroup.Name).List(metav1.ListOptions{}) - if err != nil || serviceList == nil { - log.Warnf("Failed to get the services from k8s") - return - } - for _, serviceInfo := range serviceList.Items { - servicesMap[service.FmtServiceDisplayName(&serviceInfo)] = service.FmtServiceName(&serviceInfo) + servicesMap, err := service.GetServicesMap(i.DeviceManager.K8sClient, subGroup.Name) + if err != nil || servicesMap == nil { + log.Warnf("Failed to get the services from k8s, namespace: %v, err: %v", subGroup.Name, err) + continue } // get and check all the devices in the group @@ -149,10 +144,6 @@ func (i *InitSyncer) initSyncServices(parentGroupID int32, c chan string) { } } -func (i *InitSyncer) sendInfoToChan(info string, c chan string) { - c <- info -} - func (i *InitSyncer) syncDevices(resourceType string, resourcesMap map[string]string, subGroup logicmonitor.GroupData) { devices, err := i.DeviceManager.GetListByGroupID(subGroup.Id) if err != nil || devices == nil { diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index 80a024244..42eecf1b4 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -12,7 +12,9 @@ import ( lm "github.com/logicmonitor/lm-sdk-go" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" ) const ( @@ -44,7 +46,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { log.Debugf("received ADD event: %s", node.Name) // Require an IP address. - if GetInternalAddress(node.Status.Addresses) == nil { + if getInternalAddress(node.Status.Addresses) == nil { return } w.add(node) @@ -61,8 +63,8 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { // If the old node does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new device. - oldInternalAddress := GetInternalAddress(old.Status.Addresses) - newInternalAddress := GetInternalAddress(new.Status.Addresses) + oldInternalAddress := getInternalAddress(old.Status.Addresses) + newInternalAddress := getInternalAddress(new.Status.Addresses) if oldInternalAddress == nil && newInternalAddress != nil { w.add(new) return @@ -84,7 +86,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { log.Debugf("received DELETE event: %s", node.Name) // Delete the node. - internalAddress := GetInternalAddress(node.Status.Addresses).Address + internalAddress := getInternalAddress(node.Status.Addresses).Address if w.Config().DeleteDevices { if err := w.DeleteByName(internalAddress); err != nil { log.Errorf("Failed to delete node: %v", err) @@ -138,7 +140,7 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, node.Labels) return []types.DeviceOption{ - w.Name(GetInternalAddress(node.Status.Addresses).Address), + w.Name(getInternalAddress(node.Status.Addresses).Address), w.ResourceLabels(node.Labels), w.DisplayName(node.Name), w.SystemCategories(categories), @@ -148,8 +150,8 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { } } -// GetInternalAddress finds the node's internal address. -func GetInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { +// getInternalAddress finds the node's internal address. +func getInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { for _, address := range addresses { if address.Type == v1.NodeInternalIP { return &address @@ -191,3 +193,17 @@ func (w *Watcher) createRoleDeviceGroup(labels map[string]string) { log.Printf("Added device group for node role %q", role) } + +// GetNodesMap implements the getting nodes map info from k8s +func GetNodesMap(k8sClient *kubernetes.Clientset) (map[string]string, error) { + nodesMap := make(map[string]string) + nodeList, err := k8sClient.CoreV1().Nodes().List(metav1.ListOptions{}) + if err != nil || nodeList == nil { + return nil, err + } + for _, nodeInfo := range nodeList.Items { + nodesMap[nodeInfo.Name] = getInternalAddress(nodeInfo.Status.Addresses).Address + } + + return nodesMap, nil +} diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index 106d700d8..12a738b6b 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -8,7 +8,9 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/utilities" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" ) const ( @@ -144,3 +146,18 @@ func (w *Watcher) args(pod *v1.Pod, category string) []types.DeviceOption { w.System("ips", pod.Status.PodIP), } } + +// GetPodsMap implements the getting pods map info from k8s +func GetPodsMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]string, error) { + podsMap := make(map[string]string) + podList, err := k8sClient.CoreV1().Pods(namespace).List(metav1.ListOptions{}) + if err != nil || podList == nil { + return nil, err + } + for _, podInfo := range podList.Items { + // TODO: we should improve the value of the map to the ip of the pod when changing the name of the device to the ip + podsMap[podInfo.Name] = podInfo.Name + } + + return podsMap, nil +} diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index c140cea72..172d92528 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -8,7 +8,9 @@ import ( "github.com/logicmonitor/k8s-argus/pkg/utilities" log "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" ) const ( @@ -79,7 +81,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { // Delete the service. if w.Config().DeleteDevices { - if err := w.DeleteByName(FmtServiceName(service)); err != nil { + if err := w.DeleteByName(fmtServiceName(service)); err != nil { log.Errorf("Failed to delete service: %v", err) return } @@ -100,32 +102,32 @@ func (w *Watcher) add(service *v1.Service) { log.Errorf("Failed to add service %q: %v", service.Name, err) return } - log.Infof("Added service %q", FmtServiceName(service)) + log.Infof("Added service %q", fmtServiceName(service)) } func (w *Watcher) update(old, new *v1.Service) { if _, err := w.UpdateAndReplaceByName( - FmtServiceName(old), + fmtServiceName(old), w.args(new, constants.ServiceCategory)..., ); err != nil { - log.Errorf("Failed to update service %q: %v", FmtServiceName(new), err) + log.Errorf("Failed to update service %q: %v", fmtServiceName(new), err) return } log.Infof("Updated service %q", old.Name) } func (w *Watcher) move(service *v1.Service) { - if _, err := w.UpdateAndReplaceFieldByName(FmtServiceName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { - log.Errorf("Failed to move service %q: %v", FmtServiceName(service), err) + if _, err := w.UpdateAndReplaceFieldByName(fmtServiceName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { + log.Errorf("Failed to move service %q: %v", fmtServiceName(service), err) return } - log.Infof("Moved service %q", FmtServiceName(service)) + log.Infof("Moved service %q", fmtServiceName(service)) } func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, service.Labels) return []types.DeviceOption{ - w.Name(FmtServiceName(service)), + w.Name(fmtServiceName(service)), w.ResourceLabels(service.Labels), w.DisplayName(FmtServiceDisplayName(service)), w.SystemCategories(categories), @@ -136,12 +138,27 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio } } -// FmtServiceName implements the conversion for the service name -func FmtServiceName(service *v1.Service) string { +// fmtServiceName implements the conversion for the service name +func fmtServiceName(service *v1.Service) string { return service.Name + "." + service.Namespace + ".svc" } // FmtServiceDisplayName implements the conversion for the service display name func FmtServiceDisplayName(service *v1.Service) string { - return FmtServiceName(service) + "-" + string(service.UID) + return fmtServiceName(service) + "-" + string(service.UID) +} + +// GetServicesMap implements the getting services map info from k8s +func GetServicesMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]string, error) { + servicesMap := make(map[string]string) + serviceList, err := k8sClient.CoreV1().Services(namespace).List(metav1.ListOptions{}) + if err != nil || serviceList == nil { + log.Warnf("Failed to get the services from k8s") + return nil, err + } + for _, serviceInfo := range serviceList.Items { + servicesMap[FmtServiceDisplayName(&serviceInfo)] = fmtServiceName(&serviceInfo) + } + + return servicesMap, nil } From b4ce488353d7f5435ac1101437b712dc196ba633 Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 9 Nov 2018 10:47:53 +0800 Subject: [PATCH 06/22] DEV-40505: Sync the k8s resource to santaba when argus launches --- pkg/constants/constants.go | 5 +++++ pkg/device/device.go | 8 ++++---- pkg/sync/initsyncer.go | 16 ++++++++++++++++ 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 8be6aac57..8c6430be0 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -74,3 +74,8 @@ const ( // Account is the environment variable name to lookup for the LogicMonitor account. Account = "ARGUS_ACCOUNT" ) + +const ( + // K8sClusterNamePropertyKey is the key of the unique auto property kubernetes cluster name + K8sClusterNamePropertyKey = "auto.clustername" +) diff --git a/pkg/device/device.go b/pkg/device/device.go index 61c8caf24..29938d4fa 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/logicmonitor/k8s-argus/pkg/config" + "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/device/builder" "github.com/logicmonitor/k8s-argus/pkg/types" - "github.com/logicmonitor/k8s-collectorset-controller/api" - "github.com/logicmonitor/k8s-argus/pkg/utilities" + "github.com/logicmonitor/k8s-collectorset-controller/api" lm "github.com/logicmonitor/lm-sdk-go" log "github.com/sirupsen/logrus" ) @@ -25,7 +25,7 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti device := &lm.RestDevice{ CustomProperties: []lm.NameAndValue{ { - Name: "auto.clustername", + Name: constants.K8sClusterNamePropertyKey, Value: c.ClusterName, }, }, @@ -186,7 +186,7 @@ func find(field, name string, client *lm.DefaultApi) (*lm.RestDevice, error) { // GetListByGroupID implements getting all the devices belongs to the group directly func (m *Manager) GetListByGroupID(groupID int32) ([]lm.RestDevice, error) { - restResponse, apiResponse, err := m.LMClient.GetImmediateDeviceListByDeviceGroupId(groupID, "id,name,displayName", -1, 0, "") + restResponse, apiResponse, err := m.LMClient.GetImmediateDeviceListByDeviceGroupId(groupID, "id,name,displayName,customProperties", -1, 0, "") if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { return nil, _err } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index d12ba0eb6..cf99724be 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -151,6 +151,22 @@ func (i *InitSyncer) syncDevices(resourceType string, resourcesMap map[string]st return } for _, device := range devices { + // the "auto.clustername" property checking is used to prevent unexpected deletion of the normal non-k8s device + // which may be assigned to the cluster group + cps := device.CustomProperties + autoClusterName := "" + for _, cp := range cps { + if cp.Name == constants.K8sClusterNamePropertyKey { + autoClusterName = cp.Value + break + } + } + if autoClusterName != i.DeviceManager.Config().ClusterName { + log.Infof("Ignore the device (%v) which does not have property %v:%v", + device.DisplayName, constants.K8sClusterNamePropertyKey, i.DeviceManager.Config().ClusterName) + continue + } + name, exist := resourcesMap[device.DisplayName] if !exist || name != device.Name { log.Infof("Delete the non-exist %v device: %v", resourceType, device.DisplayName) From e7c90f107a58c371b749b7460384c10ca993bb3f Mon Sep 17 00:00:00 2001 From: jeremy Date: Wed, 14 Nov 2018 21:30:41 +0800 Subject: [PATCH 07/22] DEV-40779: Enable the alert on service group when it is created by argus --- pkg/tree/tree.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/tree/tree.go b/pkg/tree/tree.go index d4c61a6e7..38a857a8e 100644 --- a/pkg/tree/tree.go +++ b/pkg/tree/tree.go @@ -49,9 +49,8 @@ func (d *DeviceTree) buildOptsSlice() []*devicegroup.Options { }, { - Name: constants.ServiceDeviceGroupName, - // Services are a WIP in the product, disable alerting for now, - DisableAlerting: true, + Name: constants.ServiceDeviceGroupName, + DisableAlerting: d.Config.DisableAlerting, AppliesTo: devicegroup.NewAppliesToBuilder(), Client: d.LMClient, DeleteDevices: d.Config.DeleteDevices, From cc8efdae24b6698a93aa1a9598a0df7d96148080 Mon Sep 17 00:00:00 2001 From: jeremy Date: Tue, 11 Dec 2018 18:02:03 +0800 Subject: [PATCH 08/22] DEV-41432: Use IP as hostname for argus related devices --- pkg/watch/pod/pod.go | 16 +++++++++++++++- pkg/watch/service/service.go | 11 ++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index 12a738b6b..b5ebd8b17 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -3,6 +3,7 @@ package pod import ( + "fmt" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/types" "github.com/logicmonitor/k8s-argus/pkg/utilities" @@ -11,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" + "strings" ) const ( @@ -134,7 +136,7 @@ func (w *Watcher) move(pod *v1.Pod) { func (w *Watcher) args(pod *v1.Pod, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, pod.Labels) return []types.DeviceOption{ - w.Name(pod.Name), + w.Name(getPodDNSName(pod)), w.ResourceLabels(pod.Labels), w.DisplayName(pod.Name), w.SystemCategories(categories), @@ -147,6 +149,18 @@ func (w *Watcher) args(pod *v1.Pod, category string) []types.DeviceOption { } } +func getPodDNSName(pod *v1.Pod) string { + // if the pod is configured as "hostnetwork=true", we will use the pod name as the IP/DNS name of the pod device + if pod.Spec.HostNetwork { + return pod.Name + } + + // get the dns name as the format: "pod-ip-address.my-namespace.pod.cluster.local" + podIP := pod.Status.PodIP + podIP = strings.Replace(podIP, ".", "-", -1) + return fmt.Sprintf("%s.%s.pod.cluster.local", podIP, pod.Namespace) +} + // GetPodsMap implements the getting pods map info from k8s func GetPodsMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]string, error) { podsMap := make(map[string]string) diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index 172d92528..fd1bfe78c 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -3,6 +3,7 @@ package service import ( + "fmt" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/types" "github.com/logicmonitor/k8s-argus/pkg/utilities" @@ -129,7 +130,7 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio return []types.DeviceOption{ w.Name(fmtServiceName(service)), w.ResourceLabels(service.Labels), - w.DisplayName(FmtServiceDisplayName(service)), + w.DisplayName(fmtServiceDisplayName(service)), w.SystemCategories(categories), w.Auto("name", service.Name), w.Auto("namespace", service.Namespace), @@ -140,12 +141,12 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio // fmtServiceName implements the conversion for the service name func fmtServiceName(service *v1.Service) string { - return service.Name + "." + service.Namespace + ".svc" + return fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace) } // FmtServiceDisplayName implements the conversion for the service display name -func FmtServiceDisplayName(service *v1.Service) string { - return fmtServiceName(service) + "-" + string(service.UID) +func fmtServiceDisplayName(service *v1.Service) string { + return fmt.Sprintf("%s-%s", service.Name, string(service.UID)) } // GetServicesMap implements the getting services map info from k8s @@ -157,7 +158,7 @@ func GetServicesMap(k8sClient *kubernetes.Clientset, namespace string) (map[stri return nil, err } for _, serviceInfo := range serviceList.Items { - servicesMap[FmtServiceDisplayName(&serviceInfo)] = fmtServiceName(&serviceInfo) + servicesMap[fmtServiceDisplayName(&serviceInfo)] = fmtServiceName(&serviceInfo) } return servicesMap, nil From eb4fffbff02833af131710bd1b01d0ccbf32dc01 Mon Sep 17 00:00:00 2001 From: jeremy Date: Thu, 13 Dec 2018 13:38:02 +0800 Subject: [PATCH 09/22] DEV-41432: Use IP as hostname for argus related devices --- pkg/device/device.go | 23 +++++++++-------------- pkg/etcd/etcd.go | 2 +- pkg/types/types.go | 15 ++++++--------- pkg/watch/node/node.go | 9 ++++----- pkg/watch/pod/pod.go | 18 ++++++------------ pkg/watch/service/service.go | 29 ++++++++++++----------------- 6 files changed, 38 insertions(+), 58 deletions(-) diff --git a/pkg/device/device.go b/pkg/device/device.go index 29938d4fa..c6a2bd328 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -48,11 +48,6 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti return device } -// FindByName implements types.DeviceManager. -func (m *Manager) FindByName(name string) (*lm.RestDevice, error) { - return find("name", name, m.LMClient) -} - // FindByDisplayName implements types.DeviceManager. func (m *Manager) FindByDisplayName(name string) (*lm.RestDevice, error) { return find("displayName", name, m.LMClient) @@ -86,9 +81,9 @@ func (m *Manager) UpdateAndReplaceByID(id int32, options ...types.DeviceOption) return &restResponse.Data, nil } -// UpdateAndReplaceByName implements types.DeviceManager. -func (m *Manager) UpdateAndReplaceByName(name string, options ...types.DeviceOption) (*lm.RestDevice, error) { - d, err := m.FindByName(name) +// UpdateAndReplaceByDisplayName implements types.DeviceManager. +func (m *Manager) UpdateAndReplaceByDisplayName(name string, options ...types.DeviceOption) (*lm.RestDevice, error) { + d, err := m.FindByDisplayName(name) if err != nil { return nil, err } @@ -121,9 +116,9 @@ func (m *Manager) UpdateAndReplaceFieldByID(id int32, field string, options ...t return &restResponse.Data, nil } -// UpdateAndReplaceFieldByName implements types.DeviceManager. -func (m *Manager) UpdateAndReplaceFieldByName(name string, field string, options ...types.DeviceOption) (*lm.RestDevice, error) { - d, err := m.FindByName(name) +// UpdateAndReplaceFieldByDisplayName implements types.DeviceManager. +func (m *Manager) UpdateAndReplaceFieldByDisplayName(name string, field string, options ...types.DeviceOption) (*lm.RestDevice, error) { + d, err := m.FindByDisplayName(name) if err != nil { return nil, err } @@ -148,9 +143,9 @@ func (m *Manager) DeleteByID(id int32) error { return utilities.CheckAllErrors(restResponse, apiResponse, err) } -// DeleteByName implements types.DeviceManager. -func (m *Manager) DeleteByName(name string) error { - d, err := m.FindByName(name) +// DeleteByDisplayName implements types.DeviceManager. +func (m *Manager) DeleteByDisplayName(name string) error { + d, err := m.FindByDisplayName(name) if err != nil { return err } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 986cef181..5c6e7d8ed 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -64,7 +64,7 @@ func (c *Controller) DiscoverByToken() ([]*Member, error) { func (c *Controller) addDevice(member *Member) { // Check if the etcd member has already been added. - d, err := c.FindByName(member.URL.Hostname()) + d, err := c.FindByDisplayName(fmtMemberDisplayName(member)) if err != nil { log.Errorf("Failed to find etcd member %q: %v", member.Name, err) return diff --git a/pkg/types/types.go b/pkg/types/types.go index 03f0b6e1f..71322b1ab 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -35,9 +35,6 @@ type DeviceManager interface { type DeviceMapper interface { // Config returns the Argus config. Config() *config.Config - // FindByName searches for a device by it's name. It will return a device if and only - // if one device was found, and return nil otherwise. - FindByName(string) (*lm.RestDevice, error) // FindByDisplayName searches for a device by it's display name. It will return a device if and only if // one device was found, and return nil otherwise. FindByDisplayName(string) (*lm.RestDevice, error) @@ -45,18 +42,18 @@ type DeviceMapper interface { Add(...DeviceOption) (*lm.RestDevice, error) // UpdateAndReplaceByID updates a device using the 'replace' OpType. UpdateAndReplaceByID(int32, ...DeviceOption) (*lm.RestDevice, error) - // UpdateAndReplaceByName updates a device using the 'replace' OpType if and onlt if it does not already exist. - UpdateAndReplaceByName(string, ...DeviceOption) (*lm.RestDevice, error) + // UpdateAndReplaceByDisplayName updates a device using the 'replace' OpType if and onlt if it does not already exist. + UpdateAndReplaceByDisplayName(string, ...DeviceOption) (*lm.RestDevice, error) // UpdateAndReplaceFieldByID updates a device using the 'replace' OpType for a // specific field of a device. UpdateAndReplaceFieldByID(int32, string, ...DeviceOption) (*lm.RestDevice, error) - // UpdateAndReplaceFieldByName updates a device using the 'replace' OpType for a + // UpdateAndReplaceFieldByDisplayName updates a device using the 'replace' OpType for a // specific field of a device. - UpdateAndReplaceFieldByName(string, string, ...DeviceOption) (*lm.RestDevice, error) + UpdateAndReplaceFieldByDisplayName(string, string, ...DeviceOption) (*lm.RestDevice, error) // DeleteByID deletes a device by device ID. DeleteByID(int32) error - // DeleteByName deletes a device by device name. - DeleteByName(string) error + // DeleteByDisplayName deletes a device by device display name. + DeleteByDisplayName(string) error } // DeviceOption is the function definition for the functional options pattern. diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index c9beb5e36..e407c3a78 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -86,13 +86,12 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { log.Debugf("received DELETE event: %s", node.Name) // Delete the node. - internalAddress := getInternalAddress(node.Status.Addresses).Address if w.Config().DeleteDevices { - if err := w.DeleteByName(internalAddress); err != nil { + if err := w.DeleteByDisplayName(node.Name); err != nil { log.Errorf("Failed to delete node: %v", err) return } - log.Infof("Deleted node %s", internalAddress) + log.Infof("Deleted node %s", node.Name) return } @@ -113,7 +112,7 @@ func (w *Watcher) add(node *v1.Node) { } func (w *Watcher) update(old, new *v1.Node) { - if _, err := w.UpdateAndReplaceByName(old.Name, w.args(new, constants.NodeCategory)...); err != nil { + if _, err := w.UpdateAndReplaceByDisplayName(old.Name, w.args(new, constants.NodeCategory)...); err != nil { log.Errorf("Failed to update node %q: %v", new.Name, err) } else { log.Infof("Updated node %q", old.Name) @@ -129,7 +128,7 @@ func (w *Watcher) update(old, new *v1.Node) { // nolint: dupl func (w *Watcher) move(node *v1.Node) { - if _, err := w.UpdateAndReplaceFieldByName(node.Name, constants.CustomPropertiesFieldName, w.args(node, constants.NodeDeletedCategory)...); err != nil { + if _, err := w.UpdateAndReplaceFieldByDisplayName(node.Name, constants.CustomPropertiesFieldName, w.args(node, constants.NodeDeletedCategory)...); err != nil { log.Errorf("Failed to move node %q: %v", node.Name, err) return } diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index b5ebd8b17..81a7ae6e8 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -3,7 +3,6 @@ package pod import ( - "fmt" "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/types" "github.com/logicmonitor/k8s-argus/pkg/utilities" @@ -12,7 +11,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" - "strings" ) const ( @@ -65,7 +63,7 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { } if new.Status.Phase == v1.PodSucceeded { - if err := w.DeleteByName(old.Name); err != nil { + if err := w.DeleteByDisplayName(old.Name); err != nil { log.Errorf("Failed to delete pod: %v", err) return } @@ -89,7 +87,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { // Delete the pod. if w.Config().DeleteDevices { - if err := w.DeleteByName(pod.Name); err != nil { + if err := w.DeleteByDisplayName(pod.Name); err != nil { log.Errorf("Failed to delete pod: %v", err) return } @@ -114,7 +112,7 @@ func (w *Watcher) add(pod *v1.Pod) { } func (w *Watcher) update(old, new *v1.Pod) { - if _, err := w.UpdateAndReplaceByName( + if _, err := w.UpdateAndReplaceByDisplayName( old.Name, w.args(new, constants.PodCategory)..., ); err != nil { @@ -126,7 +124,7 @@ func (w *Watcher) update(old, new *v1.Pod) { // nolint: dupl func (w *Watcher) move(pod *v1.Pod) { - if _, err := w.UpdateAndReplaceFieldByName(pod.Name, constants.CustomPropertiesFieldName, w.args(pod, constants.PodDeletedCategory)...); err != nil { + if _, err := w.UpdateAndReplaceFieldByDisplayName(pod.Name, constants.CustomPropertiesFieldName, w.args(pod, constants.PodDeletedCategory)...); err != nil { log.Errorf("Failed to move pod %q: %v", pod.Name, err) return } @@ -154,11 +152,7 @@ func getPodDNSName(pod *v1.Pod) string { if pod.Spec.HostNetwork { return pod.Name } - - // get the dns name as the format: "pod-ip-address.my-namespace.pod.cluster.local" - podIP := pod.Status.PodIP - podIP = strings.Replace(podIP, ".", "-", -1) - return fmt.Sprintf("%s.%s.pod.cluster.local", podIP, pod.Namespace) + return pod.Status.PodIP } // GetPodsMap implements the getting pods map info from k8s @@ -170,7 +164,7 @@ func GetPodsMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]s } for _, podInfo := range podList.Items { // TODO: we should improve the value of the map to the ip of the pod when changing the name of the device to the ip - podsMap[podInfo.Name] = podInfo.Name + podsMap[podInfo.Name] = getPodDNSName(&podInfo) } return podsMap, nil diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index fd1bfe78c..4af87713c 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -82,7 +82,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { // Delete the service. if w.Config().DeleteDevices { - if err := w.DeleteByName(fmtServiceName(service)); err != nil { + if err := w.DeleteByDisplayName(service.Spec.ClusterIP); err != nil { log.Errorf("Failed to delete service: %v", err) return } @@ -100,35 +100,35 @@ func (w *Watcher) add(service *v1.Service) { if _, err := w.Add( w.args(service, constants.ServiceCategory)..., ); err != nil { - log.Errorf("Failed to add service %q: %v", service.Name, err) + log.Errorf("Failed to add service %q: %v", fmtServiceDisplayName(service), err) return } - log.Infof("Added service %q", fmtServiceName(service)) + log.Infof("Added service %q", fmtServiceDisplayName(service)) } func (w *Watcher) update(old, new *v1.Service) { - if _, err := w.UpdateAndReplaceByName( - fmtServiceName(old), + if _, err := w.UpdateAndReplaceByDisplayName( + old.Spec.ClusterIP, w.args(new, constants.ServiceCategory)..., ); err != nil { - log.Errorf("Failed to update service %q: %v", fmtServiceName(new), err) + log.Errorf("Failed to update service %q: %v", fmtServiceDisplayName(new), err) return } - log.Infof("Updated service %q", old.Name) + log.Infof("Updated service %q", fmtServiceDisplayName(old)) } func (w *Watcher) move(service *v1.Service) { - if _, err := w.UpdateAndReplaceFieldByName(fmtServiceName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { - log.Errorf("Failed to move service %q: %v", fmtServiceName(service), err) + if _, err := w.UpdateAndReplaceFieldByDisplayName(service.Spec.ClusterIP, constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { + log.Errorf("Failed to move service %q: %v", fmtServiceDisplayName(service), err) return } - log.Infof("Moved service %q", fmtServiceName(service)) + log.Infof("Moved service %q", fmtServiceDisplayName(service)) } func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, service.Labels) return []types.DeviceOption{ - w.Name(fmtServiceName(service)), + w.Name(service.Spec.ClusterIP), w.ResourceLabels(service.Labels), w.DisplayName(fmtServiceDisplayName(service)), w.SystemCategories(categories), @@ -139,11 +139,6 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio } } -// fmtServiceName implements the conversion for the service name -func fmtServiceName(service *v1.Service) string { - return fmt.Sprintf("%s.%s.svc.cluster.local", service.Name, service.Namespace) -} - // FmtServiceDisplayName implements the conversion for the service display name func fmtServiceDisplayName(service *v1.Service) string { return fmt.Sprintf("%s-%s", service.Name, string(service.UID)) @@ -158,7 +153,7 @@ func GetServicesMap(k8sClient *kubernetes.Clientset, namespace string) (map[stri return nil, err } for _, serviceInfo := range serviceList.Items { - servicesMap[fmtServiceDisplayName(&serviceInfo)] = fmtServiceName(&serviceInfo) + servicesMap[fmtServiceDisplayName(&serviceInfo)] = serviceInfo.Spec.ClusterIP } return servicesMap, nil From 16000b533b7e527c40fd042090ed27f590515ed8 Mon Sep 17 00:00:00 2001 From: jeremy Date: Thu, 13 Dec 2018 17:29:55 +0800 Subject: [PATCH 10/22] DEV-41459: Add a new device type for k8s devices --- pkg/constants/constants.go | 2 ++ pkg/device/device.go | 1 + 2 files changed, 3 insertions(+) diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index 8c6430be0..91473f78a 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -78,4 +78,6 @@ const ( const ( // K8sClusterNamePropertyKey is the key of the unique auto property kubernetes cluster name K8sClusterNamePropertyKey = "auto.clustername" + // K8sDeviceType is the type value of the k8s device + K8sDeviceType = 8 ) diff --git a/pkg/device/device.go b/pkg/device/device.go index 29938d4fa..1488ecbd9 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -31,6 +31,7 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti }, DisableAlerting: c.DisableAlerting, HostGroupIds: "1", + DeviceType: constants.K8sDeviceType, } for _, option := range options { From e6152f8c1b2cc85aa50e75136a6da73f4482d72a Mon Sep 17 00:00:00 2001 From: jeremy Date: Mon, 17 Dec 2018 16:56:13 +0800 Subject: [PATCH 11/22] DEV-41432: Use IP as hostname for argus related devices --- pkg/watch/service/service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index 4af87713c..cb1fb5715 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -82,7 +82,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { // Delete the service. if w.Config().DeleteDevices { - if err := w.DeleteByDisplayName(service.Spec.ClusterIP); err != nil { + if err := w.DeleteByDisplayName(fmtServiceDisplayName(service)); err != nil { log.Errorf("Failed to delete service: %v", err) return } @@ -108,7 +108,7 @@ func (w *Watcher) add(service *v1.Service) { func (w *Watcher) update(old, new *v1.Service) { if _, err := w.UpdateAndReplaceByDisplayName( - old.Spec.ClusterIP, + fmtServiceDisplayName(old), w.args(new, constants.ServiceCategory)..., ); err != nil { log.Errorf("Failed to update service %q: %v", fmtServiceDisplayName(new), err) @@ -118,7 +118,7 @@ func (w *Watcher) update(old, new *v1.Service) { } func (w *Watcher) move(service *v1.Service) { - if _, err := w.UpdateAndReplaceFieldByDisplayName(service.Spec.ClusterIP, constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { + if _, err := w.UpdateAndReplaceFieldByDisplayName(fmtServiceDisplayName(service), constants.CustomPropertiesFieldName, w.args(service, constants.ServiceDeletedCategory)...); err != nil { log.Errorf("Failed to move service %q: %v", fmtServiceDisplayName(service), err) return } @@ -141,7 +141,7 @@ func (w *Watcher) args(service *v1.Service, category string) []types.DeviceOptio // FmtServiceDisplayName implements the conversion for the service display name func fmtServiceDisplayName(service *v1.Service) string { - return fmt.Sprintf("%s-%s", service.Name, string(service.UID)) + return fmt.Sprintf("%s.%s.svc-%s", service.Name, service.Namespace, string(service.UID)) } // GetServicesMap implements the getting services map info from k8s From f35228a18f095dc616ee3ff9646110d54b674caa Mon Sep 17 00:00:00 2001 From: jeremy Date: Tue, 18 Dec 2018 17:14:26 +0800 Subject: [PATCH 12/22] DEV-40217: Support to set log levels and improve logs in argus related projects --- cmd/watch.go | 2 ++ pkg/device/device.go | 6 +++--- pkg/sync/initsyncer.go | 2 +- pkg/utilities/utilities.go | 18 +++++++++--------- pkg/watch/namespace/namespace.go | 8 ++++++-- pkg/watch/node/node.go | 8 ++++---- pkg/watch/pod/pod.go | 6 +++--- 7 files changed, 28 insertions(+), 22 deletions(-) diff --git a/cmd/watch.go b/cmd/watch.go index a7ed46548..b7e0d788e 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -113,6 +113,7 @@ func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) { case <-timeout: return false, fmt.Errorf("Timeout waiting for collectors to become available") case <-ticker.C: + log.Debugf("Start to check the collectors status") ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() req := &healthpb.HealthCheckRequest{ @@ -126,6 +127,7 @@ func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) { if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING { return true, nil } + log.Debugf("The collectors is not ready: %d", healthCheckResponse.GetStatus()) } } } diff --git a/pkg/device/device.go b/pkg/device/device.go index d2bcb8c68..57ae4419e 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -40,9 +40,9 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti reply, err := client.CollectorID(context.Background(), &api.CollectorIDRequest{}) if err != nil { - log.Printf("Failed to get collector ID: %v", err) + log.Errorf("Failed to get collector ID: %v", err) } else { - log.Printf("Using collector ID %d for %q", reply.Id, device.DisplayName) + log.Infof("Using collector ID %d for %q", reply.Id, device.DisplayName) device.PreferredCollectorId = reply.Id } @@ -90,7 +90,7 @@ func (m *Manager) UpdateAndReplaceByDisplayName(name string, options ...types.De } if d == nil { - log.Printf("Could not find device %q", name) + log.Warnf("Could not find device %q", name) return nil, nil } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index cf99724be..e828aa17c 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -66,8 +66,8 @@ func (i *InitSyncer) InitSync() { // wait the init sync processes finishing wg.Wait() - log.Infof("Finish syncing the resource devices") } + log.Infof("Finish syncing the resource devices") } func (i *InitSyncer) intSyncNodes(parentGroupID int32) { diff --git a/pkg/utilities/utilities.go b/pkg/utilities/utilities.go index 383da4b4f..05f2efb40 100644 --- a/pkg/utilities/utilities.go +++ b/pkg/utilities/utilities.go @@ -34,6 +34,15 @@ func GetLabelByPrefix(prefix string, labels map[string]string) (string, string) // CheckAllErrors is a helper function to deal with the number of possible places that an API call can fail. func CheckAllErrors(restResponse interface{}, apiResponse *logicmonitor.APIResponse, err error) error { + if err != nil { + return fmt.Errorf("[ERROR] %v", err) + } + + if apiResponse.Response != nil && apiResponse.StatusCode != http.StatusOK { + metrics.APIError() + return fmt.Errorf("[API] [%d] %s", apiResponse.StatusCode, apiResponse.Message) + } + var restResponseMessage string var restResponseStatus int64 @@ -62,14 +71,5 @@ func CheckAllErrors(restResponse interface{}, apiResponse *logicmonitor.APIRespo return fmt.Errorf("[REST] [%d] %s", restResponseStatus, restResponseMessage) } - if apiResponse.StatusCode != http.StatusOK { - metrics.APIError() - return fmt.Errorf("[API] [%d] %s", apiResponse.StatusCode, restResponseMessage) - } - - if err != nil { - return fmt.Errorf("[ERROR] %v", err) - } - return nil } diff --git a/pkg/watch/namespace/namespace.go b/pkg/watch/namespace/namespace.go index eed8b8e05..84cb36308 100644 --- a/pkg/watch/namespace/namespace.go +++ b/pkg/watch/namespace/namespace.go @@ -34,6 +34,7 @@ func (w Watcher) ObjType() runtime.Object { func (w Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { namespace := obj.(*v1.Namespace) + log.Debugf("Handle adding namespace event: %s", namespace.Name) for name, parentID := range w.DeviceGroups { var appliesTo devicegroup.AppliesToBuilder // Ensure that we are creating namespaces for namespaced resources. @@ -62,7 +63,7 @@ func (w Watcher) AddFunc() func(obj interface{}) { return } - log.Printf("Added namespace %q to %q", namespace.Name, name) + log.Infof("Added namespace %q to %q", namespace.Name, name) } } } @@ -70,6 +71,7 @@ func (w Watcher) AddFunc() func(obj interface{}) { // UpdateFunc is a function that implements the Watcher interface. func (w Watcher) UpdateFunc() func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { + log.Debugf("Ignore updating namespace event") // oldNamespace := oldObj.(*v1.Namespace) // newNamespace := newObj.(*v1.Namespace) } @@ -79,10 +81,12 @@ func (w Watcher) UpdateFunc() func(oldObj, newObj interface{}) { func (w Watcher) DeleteFunc() func(obj interface{}) { return func(obj interface{}) { namespace := obj.(*v1.Namespace) + log.Debugf("Handle deleting namespace event: %s", namespace.Name) + for name, parentID := range w.DeviceGroups { deviceGroup, err := devicegroup.Find(parentID, name, w.LMClient) if err != nil { - log.Printf("Failed to find namespace %s: %v", name, err) + log.Warnf("Failed to find namespace %s: %v", name, err) return } // We should only be returned a device group if it is namespaced. diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index e407c3a78..c7d6dc2a2 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -43,7 +43,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { node := obj.(*v1.Node) - log.Debugf("received ADD event: %s", node.Name) + log.Debugf("Handle adding node event: %s", node.Name) // Require an IP address. if getInternalAddress(node.Status.Addresses) == nil { @@ -59,7 +59,7 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { old := oldObj.(*v1.Node) new := newObj.(*v1.Node) - log.Debugf("received UPDATE event: %s", old.Name) + log.Debugf("Handle updating node event: %s", old.Name) // If the old node does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new device. @@ -83,7 +83,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { return func(obj interface{}) { node := obj.(*v1.Node) - log.Debugf("received DELETE event: %s", node.Name) + log.Debugf("Handle deleting node event: %s", node.Name) // Delete the node. if w.Config().DeleteDevices { @@ -190,7 +190,7 @@ func (w *Watcher) createRoleDeviceGroup(labels map[string]string) { return } - log.Printf("Added device group for node role %q", role) + log.Infof("Added device group for node role %q", role) } // GetNodesMap implements the getting nodes map info from k8s diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index 81a7ae6e8..a71af8d3c 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -37,7 +37,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { pod := obj.(*v1.Pod) - log.Debugf("received ADD event: %s", pod.Name) + log.Debugf("Handle adding pod event: %s", pod.Name) // Require an IP address. if pod.Status.PodIP == "" { @@ -53,7 +53,7 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { old := oldObj.(*v1.Pod) new := newObj.(*v1.Pod) - log.Debugf("received UPDATE event: %s", old.Name) + log.Debugf("Handle updating pod event: %s", old.Name) // If the old pod does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new w. @@ -83,7 +83,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { return func(obj interface{}) { pod := obj.(*v1.Pod) - log.Debugf("received DELETE event: %s", pod.Name) + log.Debugf("Handle deleting pod event: %s", pod.Name) // Delete the pod. if w.Config().DeleteDevices { From 968a0ad080944f0a9c19b42f69c2dfc592a22ac8 Mon Sep 17 00:00:00 2001 From: jeremy Date: Wed, 19 Dec 2018 14:42:06 +0800 Subject: [PATCH 13/22] DEV-40217: Support to set log levels and improve logs in argus related projects --- cmd/watch.go | 4 ++-- pkg/sync/initsyncer.go | 2 +- pkg/watch/namespace/namespace.go | 4 ++-- pkg/watch/node/node.go | 6 +++--- pkg/watch/pod/pod.go | 6 +++--- 5 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/watch.go b/cmd/watch.go index b7e0d788e..fc1d49644 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -113,7 +113,7 @@ func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) { case <-timeout: return false, fmt.Errorf("Timeout waiting for collectors to become available") case <-ticker.C: - log.Debugf("Start to check the collectors status") + log.Debugf("Checking collectors status") ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() req := &healthpb.HealthCheckRequest{ @@ -127,7 +127,7 @@ func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) { if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING { return true, nil } - log.Debugf("The collectors is not ready: %d", healthCheckResponse.GetStatus()) + log.Debugf("The collectors are not ready: %d", healthCheckResponse.GetStatus()) } } } diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index e828aa17c..ffe1fb820 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -67,7 +67,7 @@ func (i *InitSyncer) InitSync() { // wait the init sync processes finishing wg.Wait() } - log.Infof("Finish syncing the resource devices") + log.Infof("Finished syncing the resource devices") } func (i *InitSyncer) intSyncNodes(parentGroupID int32) { diff --git a/pkg/watch/namespace/namespace.go b/pkg/watch/namespace/namespace.go index 84cb36308..565bf90f2 100644 --- a/pkg/watch/namespace/namespace.go +++ b/pkg/watch/namespace/namespace.go @@ -34,7 +34,7 @@ func (w Watcher) ObjType() runtime.Object { func (w Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { namespace := obj.(*v1.Namespace) - log.Debugf("Handle adding namespace event: %s", namespace.Name) + log.Debugf("Handling add namespace event: %s", namespace.Name) for name, parentID := range w.DeviceGroups { var appliesTo devicegroup.AppliesToBuilder // Ensure that we are creating namespaces for namespaced resources. @@ -71,7 +71,7 @@ func (w Watcher) AddFunc() func(obj interface{}) { // UpdateFunc is a function that implements the Watcher interface. func (w Watcher) UpdateFunc() func(oldObj, newObj interface{}) { return func(oldObj, newObj interface{}) { - log.Debugf("Ignore updating namespace event") + log.Debugf("Ignoring update namespace event") // oldNamespace := oldObj.(*v1.Namespace) // newNamespace := newObj.(*v1.Namespace) } diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index c7d6dc2a2..07752311e 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -43,7 +43,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { node := obj.(*v1.Node) - log.Debugf("Handle adding node event: %s", node.Name) + log.Debugf("Handling add node event: %s", node.Name) // Require an IP address. if getInternalAddress(node.Status.Addresses) == nil { @@ -59,7 +59,7 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { old := oldObj.(*v1.Node) new := newObj.(*v1.Node) - log.Debugf("Handle updating node event: %s", old.Name) + log.Debugf("Handling update node event: %s", old.Name) // If the old node does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new device. @@ -83,7 +83,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { return func(obj interface{}) { node := obj.(*v1.Node) - log.Debugf("Handle deleting node event: %s", node.Name) + log.Debugf("Handling delete node event: %s", node.Name) // Delete the node. if w.Config().DeleteDevices { diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index a71af8d3c..857fbbd08 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -37,7 +37,7 @@ func (w *Watcher) AddFunc() func(obj interface{}) { return func(obj interface{}) { pod := obj.(*v1.Pod) - log.Debugf("Handle adding pod event: %s", pod.Name) + log.Debugf("Handling add pod event: %s", pod.Name) // Require an IP address. if pod.Status.PodIP == "" { @@ -53,7 +53,7 @@ func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) { old := oldObj.(*v1.Pod) new := newObj.(*v1.Pod) - log.Debugf("Handle updating pod event: %s", old.Name) + log.Debugf("Handling update pod event: %s", old.Name) // If the old pod does not have an IP, then there is no way we could // have added it to LogicMonitor. Therefore, it must be a new w. @@ -83,7 +83,7 @@ func (w *Watcher) DeleteFunc() func(obj interface{}) { return func(obj interface{}) { pod := obj.(*v1.Pod) - log.Debugf("Handle deleting pod event: %s", pod.Name) + log.Debugf("Handling delete pod event: %s", pod.Name) // Delete the pod. if w.Config().DeleteDevices { From e56e41e8efbe88814fcb2c0c4a6efc1240f09717 Mon Sep 17 00:00:00 2001 From: jeremy Date: Fri, 21 Dec 2018 14:09:57 +0800 Subject: [PATCH 14/22] DEV-41682: Improve the argus code for the CI failure in GitHub --- pkg/device/device.go | 26 ++++++++----------- pkg/sync/initsyncer.go | 49 ++++++++++++------------------------ pkg/watch/service/service.go | 1 + 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/pkg/device/device.go b/pkg/device/device.go index d2bcb8c68..b3f1a5704 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -51,7 +51,17 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti // FindByDisplayName implements types.DeviceManager. func (m *Manager) FindByDisplayName(name string) (*lm.RestDevice, error) { - return find("displayName", name, m.LMClient) + filter := fmt.Sprintf("displayName:%s", name) + restResponse, apiResponse, err := m.LMClient.GetDeviceList("", -1, 0, filter) + if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { + return nil, _err + } + log.Debugf("%#v", restResponse) + if restResponse.Data.Total == 1 { + return &restResponse.Data.Items[0], nil + } + + return nil, nil } // Add implements types.DeviceManager. @@ -166,20 +176,6 @@ func (m *Manager) Config() *config.Config { return m.Base.Config } -func find(field, name string, client *lm.DefaultApi) (*lm.RestDevice, error) { - filter := fmt.Sprintf("%s:%s", field, name) - restResponse, apiResponse, err := client.GetDeviceList("", -1, 0, filter) - if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { - return nil, _err - } - log.Debugf("%#v", restResponse) - if restResponse.Data.Total == 1 { - return &restResponse.Data.Items[0], nil - } - - return nil, nil -} - // GetListByGroupID implements getting all the devices belongs to the group directly func (m *Manager) GetListByGroupID(groupID int32) ([]lm.RestDevice, error) { restResponse, apiResponse, err := m.LMClient.GetImmediateDeviceListByDeviceGroupId(groupID, "id,name,displayName,customProperties", -1, 0, "") diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index cf99724be..e7fc0dbd7 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -46,13 +46,13 @@ func (i *InitSyncer) InitSync() { case constants.PodDeviceGroupName: go func() { defer wg.Done() - i.initSyncPods(rest.Id) + i.initSyncPodsOrServices(constants.PodDeviceGroupName, rest.Id) log.Infof("Finish syncing %v", constants.PodDeviceGroupName) }() case constants.ServiceDeviceGroupName: go func() { defer wg.Done() - i.initSyncServices(rest.Id) + i.initSyncPodsOrServices(constants.ServiceDeviceGroupName, rest.Id) log.Infof("Finish syncing %v", constants.ServiceDeviceGroupName) }() default: @@ -96,10 +96,10 @@ func (i *InitSyncer) intSyncNodes(parentGroupID int32) { } } -func (i *InitSyncer) initSyncPods(parentGroupID int32) { - rest, err := devicegroup.Find(parentGroupID, constants.PodDeviceGroupName, i.DeviceManager.LMClient) +func (i *InitSyncer) initSyncPodsOrServices(deviceType string, parentGroupID int32) { + rest, err := devicegroup.Find(parentGroupID, deviceType, i.DeviceManager.LMClient) if err != nil || rest == nil { - log.Warnf("Failed to get the pod group") + log.Warnf("Failed to get the %s group", deviceType) return } if rest.SubGroups == nil { @@ -108,39 +108,22 @@ func (i *InitSyncer) initSyncPods(parentGroupID int32) { // loop every namespace for _, subGroup := range rest.SubGroups { - //get pod info from k8s - podsMap, err := pod.GetPodsMap(i.DeviceManager.K8sClient, subGroup.Name) - if err != nil || podsMap == nil { - log.Warnf("Failed to get the pods from k8s, namespace: %v, err: %v", subGroup.Name, err) - continue + //get pod/service info from k8s + var deviceMap map[string]string + if deviceType == constants.PodDeviceGroupName { + deviceMap, err = pod.GetPodsMap(i.DeviceManager.K8sClient, subGroup.Name) + } else if deviceType == constants.ServiceDeviceGroupName { + deviceMap, err = service.GetServicesMap(i.DeviceManager.K8sClient, subGroup.Name) + } else { + return } - - // get and check all the devices in the group - i.syncDevices(constants.PodDeviceGroupName, podsMap, subGroup) - } -} - -func (i *InitSyncer) initSyncServices(parentGroupID int32) { - rest, err := devicegroup.Find(parentGroupID, constants.ServiceDeviceGroupName, i.DeviceManager.LMClient) - if err != nil || rest == nil { - log.Warnf("Failed to get the pod group") - return - } - if rest.SubGroups == nil { - return - } - - // loop every namesplace - for _, subGroup := range rest.SubGroups { - //get service info from k8s - servicesMap, err := service.GetServicesMap(i.DeviceManager.K8sClient, subGroup.Name) - if err != nil || servicesMap == nil { - log.Warnf("Failed to get the services from k8s, namespace: %v, err: %v", subGroup.Name, err) + if err != nil || deviceMap == nil { + log.Warnf("Failed to get the %s from k8s, namespace: %v, err: %v", deviceType, subGroup.Name, err) continue } // get and check all the devices in the group - i.syncDevices(constants.ServiceDeviceGroupName, servicesMap, subGroup) + i.syncDevices(deviceType, deviceMap, subGroup) } } diff --git a/pkg/watch/service/service.go b/pkg/watch/service/service.go index cb1fb5715..56834da79 100644 --- a/pkg/watch/service/service.go +++ b/pkg/watch/service/service.go @@ -4,6 +4,7 @@ package service import ( "fmt" + "github.com/logicmonitor/k8s-argus/pkg/constants" "github.com/logicmonitor/k8s-argus/pkg/types" "github.com/logicmonitor/k8s-argus/pkg/utilities" From 3286dcd18f009e155cabeb505b0776101b4ae359 Mon Sep 17 00:00:00 2001 From: JeremyTangCD <44150592+JeremyTangCD@users.noreply.github.com> Date: Wed, 2 Jan 2019 16:48:56 +0800 Subject: [PATCH 15/22] Dev-40217 support to set log levels and improve Dev-40217 support to set log levels and improve --- pkg/watch/node/node.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index ca640da78..07752311e 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -206,17 +206,3 @@ func GetNodesMap(k8sClient *kubernetes.Clientset) (map[string]string, error) { return nodesMap, nil } - -// GetNodesMap implements the getting nodes map info from k8s -func GetNodesMap(k8sClient *kubernetes.Clientset) (map[string]string, error) { - nodesMap := make(map[string]string) - nodeList, err := k8sClient.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil || nodeList == nil { - return nil, err - } - for _, nodeInfo := range nodeList.Items { - nodesMap[nodeInfo.Name] = getInternalAddress(nodeInfo.Status.Addresses).Address - } - - return nodesMap, nil -} From 06bdd1586ee0b7df373246adff0165e2c8883170 Mon Sep 17 00:00:00 2001 From: jeremy Date: Wed, 9 Jan 2019 15:30:08 +0800 Subject: [PATCH 16/22] DEV-41947: Improve the initsync logic to prevent lost data after k8s device is updated --- pkg/device/device.go | 66 +++++++++++++++++++++++++++++++++++++----- pkg/sync/initsyncer.go | 4 +-- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/pkg/device/device.go b/pkg/device/device.go index 1dc85cd0a..7010cf7d8 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -49,6 +49,55 @@ func buildDevice(c *config.Config, client api.CollectorSetControllerClient, opti return device } +// checkAndUpdateExistDevice tries to find and update the devices which needs to be changed +func (m *Manager) checkAndUpdateExistDevice(device *lm.RestDevice) (*lm.RestDevice, error) { + oldDevice, err := m.FindByDisplayName(device.DisplayName) + if err != nil { + return nil, err + } + if oldDevice == nil { + return nil, fmt.Errorf("can not find the device: %s", device.DisplayName) + } + + // the device which is not changed will be ignored + if device.Name == oldDevice.Name { + log.Infof("the device (%s) does not change, ignore the updating", device.DisplayName) + return device, nil + } + + // the device of the other cluster will be ignored + oldClusterName := "" + if oldDevice.CustomProperties != nil && len(oldDevice.CustomProperties) > 0 { + for _, cp := range oldDevice.CustomProperties { + if cp.Name == constants.K8sClusterNamePropertyKey { + oldClusterName = cp.Value + } + } + } + if oldClusterName != m.Config().ClusterName { + log.Infof("the device (%s) belongs to the other cluster %s, ignore the updating", device.DisplayName, oldClusterName) + return device, nil + } + + newDevice, err := m.updateAndReplace(oldDevice.Id, device) + if err != nil { + return nil, err + } + log.Infof("Finished updating the device: %s", newDevice.DisplayName) + return newDevice, nil +} + +func (m *Manager) updateAndReplace(id int32, device *lm.RestDevice) (*lm.RestDevice, error) { + + restResponse, apiResponse, err := m.LMClient.UpdateDevice(*device, id, "replace") + if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { + return nil, _err + } + log.Debugf("%#v", restResponse) + + return &restResponse.Data, nil +} + // FindByDisplayName implements types.DeviceManager. func (m *Manager) FindByDisplayName(name string) (*lm.RestDevice, error) { filter := fmt.Sprintf("displayName:%s", name) @@ -71,6 +120,15 @@ func (m *Manager) Add(options ...types.DeviceOption) (*lm.RestDevice, error) { restResponse, apiResponse, err := m.LMClient.AddDevice(*device, false) if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { + if restResponse != nil && restResponse.Status == 600 { + log.Infof("Check and Update the existing device: %s", device.DisplayName) + newDevice, err := m.checkAndUpdateExistDevice(device) + if err != nil { + return nil, err + } + return newDevice, nil + } + return nil, _err } log.Debugf("%#v", restResponse) @@ -83,13 +141,7 @@ func (m *Manager) UpdateAndReplaceByID(id int32, options ...types.DeviceOption) device := buildDevice(m.Config(), m.ControllerClient, options...) log.Debugf("%#v", device) - restResponse, apiResponse, err := m.LMClient.UpdateDevice(*device, id, "replace") - if _err := utilities.CheckAllErrors(restResponse, apiResponse, err); _err != nil { - return nil, _err - } - log.Debugf("%#v", restResponse) - - return &restResponse.Data, nil + return m.updateAndReplace(id, device) } // UpdateAndReplaceByDisplayName implements types.DeviceManager. diff --git a/pkg/sync/initsyncer.go b/pkg/sync/initsyncer.go index 043849110..212cb5b59 100644 --- a/pkg/sync/initsyncer.go +++ b/pkg/sync/initsyncer.go @@ -150,8 +150,8 @@ func (i *InitSyncer) syncDevices(resourceType string, resourcesMap map[string]st continue } - name, exist := resourcesMap[device.DisplayName] - if !exist || name != device.Name { + _, exist := resourcesMap[device.DisplayName] + if !exist { log.Infof("Delete the non-exist %v device: %v", resourceType, device.DisplayName) err := i.DeviceManager.DeleteByID(device.Id) if err != nil { From c69d432600db453924bfeb6b9ab082883cb0bd8a Mon Sep 17 00:00:00 2001 From: jeremy Date: Thu, 10 Jan 2019 13:48:18 +0800 Subject: [PATCH 17/22] DEV-41947: Improve the initsync logic to prevent lost data after k8s device is updated --- pkg/device/device.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/device/device.go b/pkg/device/device.go index 7010cf7d8..dab862ad3 100644 --- a/pkg/device/device.go +++ b/pkg/device/device.go @@ -61,7 +61,7 @@ func (m *Manager) checkAndUpdateExistDevice(device *lm.RestDevice) (*lm.RestDevi // the device which is not changed will be ignored if device.Name == oldDevice.Name { - log.Infof("the device (%s) does not change, ignore the updating", device.DisplayName) + log.Infof("No changes to device (%s). Ignoring update", device.DisplayName) return device, nil } @@ -75,7 +75,7 @@ func (m *Manager) checkAndUpdateExistDevice(device *lm.RestDevice) (*lm.RestDevi } } if oldClusterName != m.Config().ClusterName { - log.Infof("the device (%s) belongs to the other cluster %s, ignore the updating", device.DisplayName, oldClusterName) + log.Infof("Device (%s) belongs to a different cluster (%s). Ignoring update", device.DisplayName, oldClusterName) return device, nil } From f1a712ee08c609751a8ea68c3323afab56d1c55c Mon Sep 17 00:00:00 2001 From: jeremy Date: Mon, 14 Jan 2019 15:34:48 +0800 Subject: [PATCH 18/22] DEV-42060: Don't create collector device for Kubernetes Clusters --- Gopkg.toml | 2 +- .../logicmonitor/lm-sdk-go/rest_collector.go | 11 +++++------ 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/Gopkg.toml b/Gopkg.toml index 0cdb21bc2..963451410 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -22,7 +22,7 @@ [[constraint]] - branch = "master" + branch = "v1" name = "github.com/logicmonitor/lm-sdk-go" [[constraint]] diff --git a/vendor/github.com/logicmonitor/lm-sdk-go/rest_collector.go b/vendor/github.com/logicmonitor/lm-sdk-go/rest_collector.go index 8638347b4..8814fc675 100644 --- a/vendor/github.com/logicmonitor/lm-sdk-go/rest_collector.go +++ b/vendor/github.com/logicmonitor/lm-sdk-go/rest_collector.go @@ -1,17 +1,16 @@ -/* - * +/* + * * * No description provided (generated by Swagger Codegen https://github.com/swagger-api/swagger-codegen) * * OpenAPI spec version: 1.0.0 - * + * * Generated by: https://github.com/swagger-api/swagger-codegen.git */ package logicmonitor type RestCollector struct { - ConfVersion string `json:"confVersion,omitempty"` NumberOfServices int32 `json:"numberOfServices,omitempty"` @@ -92,7 +91,7 @@ type RestCollector struct { UserPermission string `json:"userPermission,omitempty"` - NeedAutoCreateCollectorDevice bool `json:"needAutoCreateCollectorDevice,omitempty"` + NeedAutoCreateCollectorDevice bool `json:"needAutoCreateCollectorDevice"` WatchdogUpdatedOn int64 `json:"watchdogUpdatedOn,omitempty"` @@ -114,7 +113,7 @@ type RestCollector struct { CreatedOnLocal string `json:"createdOnLocal,omitempty"` - EnableFailBack bool `json:"enableFailBack,omitempty"` + EnableFailBack bool `json:"enableFailBack"` ResendIval int32 `json:"resendIval,omitempty"` From d65374e91f1b332ba3749e7144addf7ebe23ba62 Mon Sep 17 00:00:00 2001 From: jeremy Date: Tue, 22 Jan 2019 16:46:02 +0800 Subject: [PATCH 19/22] DEV-47062 Add distinguishing property to host network pods --- pkg/device/builder/builder.go | 5 +++++ pkg/types/types.go | 2 ++ pkg/watch/pod/pod.go | 6 +++++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/device/builder/builder.go b/pkg/device/builder/builder.go index 22da3da84..f4c38299a 100644 --- a/pkg/device/builder/builder.go +++ b/pkg/device/builder/builder.go @@ -63,6 +63,11 @@ func (b *Builder) System(name, value string) types.DeviceOption { return setProperty("system."+name, value) } +// Custom implements types.DeviceBuilder. +func (b *Builder) Custom(name, value string) types.DeviceOption { + return setProperty(name, value) +} + func setProperty(name, value string) types.DeviceOption { return func(device *lm.RestDevice) { if value != "" { diff --git a/pkg/types/types.go b/pkg/types/types.go index 71322b1ab..ed100de37 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -75,4 +75,6 @@ type DeviceBuilder interface { Auto(string, string) DeviceOption // System adds a system property to the device. System(string, string) DeviceOption + // System adds a custom property to the device. + Custom(string, string) DeviceOption } diff --git a/pkg/watch/pod/pod.go b/pkg/watch/pod/pod.go index 857fbbd08..9da8e11a9 100644 --- a/pkg/watch/pod/pod.go +++ b/pkg/watch/pod/pod.go @@ -133,7 +133,7 @@ func (w *Watcher) move(pod *v1.Pod) { func (w *Watcher) args(pod *v1.Pod, category string) []types.DeviceOption { categories := utilities.BuildSystemCategoriesFromLabels(category, pod.Labels) - return []types.DeviceOption{ + options := []types.DeviceOption{ w.Name(getPodDNSName(pod)), w.ResourceLabels(pod.Labels), w.DisplayName(pod.Name), @@ -145,6 +145,10 @@ func (w *Watcher) args(pod *v1.Pod, category string) []types.DeviceOption { w.Auto("uid", string(pod.UID)), w.System("ips", pod.Status.PodIP), } + if pod.Spec.HostNetwork { + options = append(options, w.Custom("kubernetes.pod.hostNetwork", "true")) + } + return options } func getPodDNSName(pod *v1.Pod) string { From 30932b003b72fb8667d88c9436346f829c8bdc88 Mon Sep 17 00:00:00 2001 From: jeremy Date: Thu, 14 Feb 2019 16:06:19 +0800 Subject: [PATCH 20/22] DEV-49046 Fix the internal IP cannot found bug in argus --- pkg/watch/node/node.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index 07752311e..7dab2372c 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -151,13 +151,17 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { // getInternalAddress finds the node's internal address. func getInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { + var hostname v1.NodeAddress for _, address := range addresses { if address.Type == v1.NodeInternalIP { return &address } + if address.Type == v1.NodeHostName { + hostname = address + } } - - return nil + //if there is no internal IP for this node, the host name will be used + return &hostname } func (w *Watcher) createRoleDeviceGroup(labels map[string]string) { @@ -201,7 +205,11 @@ func GetNodesMap(k8sClient *kubernetes.Clientset) (map[string]string, error) { return nil, err } for _, nodeInfo := range nodeList.Items { - nodesMap[nodeInfo.Name] = getInternalAddress(nodeInfo.Status.Addresses).Address + address := getInternalAddress(nodeInfo.Status.Addresses) + if address == nil { + continue + } + nodesMap[nodeInfo.Name] = address.Address } return nodesMap, nil From 2cc8d1326d9ced3f91b0f64b7f1a80407451eed4 Mon Sep 17 00:00:00 2001 From: jeremy Date: Mon, 18 Feb 2019 14:48:58 +0800 Subject: [PATCH 21/22] DEV-49046 Fix the internal IP cannot found bug in argus --- pkg/watch/node/node.go | 6 +++--- pkg/watch/node/node_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 pkg/watch/node/node_test.go diff --git a/pkg/watch/node/node.go b/pkg/watch/node/node.go index 7dab2372c..c045b3a4c 100644 --- a/pkg/watch/node/node.go +++ b/pkg/watch/node/node.go @@ -151,17 +151,17 @@ func (w *Watcher) args(node *v1.Node, category string) []types.DeviceOption { // getInternalAddress finds the node's internal address. func getInternalAddress(addresses []v1.NodeAddress) *v1.NodeAddress { - var hostname v1.NodeAddress + var hostname *v1.NodeAddress for _, address := range addresses { if address.Type == v1.NodeInternalIP { return &address } if address.Type == v1.NodeHostName { - hostname = address + hostname = &address } } //if there is no internal IP for this node, the host name will be used - return &hostname + return hostname } func (w *Watcher) createRoleDeviceGroup(labels map[string]string) { diff --git a/pkg/watch/node/node_test.go b/pkg/watch/node/node_test.go new file mode 100644 index 000000000..45a8c2ad5 --- /dev/null +++ b/pkg/watch/node/node_test.go @@ -0,0 +1,33 @@ +package node + +import ( + "k8s.io/api/core/v1" + "testing" +) + +func TestGetInternalAddress(t *testing.T) { + var addresses []v1.NodeAddress + address := getInternalAddress(addresses) + if address != nil { + t.Errorf("invalid address: %v", address) + } + addresses = append(addresses, v1.NodeAddress{ + Type: v1.NodeHostName, + Address: "test", + }) + + address = getInternalAddress(addresses) + if address == nil || address.Address != "test" { + t.Errorf("invalid address: %v", address) + } + + addresses = append(addresses, v1.NodeAddress{ + Type: v1.NodeInternalIP, + Address: "127.0.0.1", + }) + + address = getInternalAddress(addresses) + if address == nil || address.Address != "127.0.0.1" { + t.Errorf("invalid address: %v", address) + } +} From 2333cd410641ed754b8bda4a880b886d0acb871c Mon Sep 17 00:00:00 2001 From: JeremyTangCD <44150592+JeremyTangCD@users.noreply.github.com> Date: Mon, 18 Feb 2019 15:20:02 +0800 Subject: [PATCH 22/22] Update node_test.go --- pkg/watch/node/node_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/watch/node/node_test.go b/pkg/watch/node/node_test.go index 45a8c2ad5..feb9cbde1 100644 --- a/pkg/watch/node/node_test.go +++ b/pkg/watch/node/node_test.go @@ -1,8 +1,9 @@ package node import ( - "k8s.io/api/core/v1" "testing" + + "k8s.io/api/core/v1" ) func TestGetInternalAddress(t *testing.T) {