Skip to content
This repository was archived by the owner on Jan 16, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
The diff you're trying to view is too large. We only load the first 3000 changed files.
4 changes: 4 additions & 0 deletions cmd/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/logicmonitor/k8s-argus/pkg/config"
"github.com/logicmonitor/k8s-argus/pkg/connection"
"github.com/logicmonitor/k8s-argus/pkg/constants"
"github.com/logicmonitor/k8s-argus/pkg/cronjob"
"github.com/logicmonitor/k8s-argus/pkg/healthz"
lmlog "github.com/logicmonitor/k8s-argus/pkg/log"
"github.com/logicmonitor/k8s-argus/pkg/permission"
Expand Down Expand Up @@ -67,6 +68,9 @@ var watchCmd = &cobra.Command{
// Invoke the watcher.
argus.Watch()

// To update K8s & Helm properties in cluster device group periodically with the server
cronjob.UpdateTelemetryCron(base)

// Health check.
http.HandleFunc("/healthz", healthz.HandleFunc)

Expand Down
31 changes: 31 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module github.com/logicmonitor/k8s-argus

go 1.14

require (
github.com/Knetic/govaluate v3.0.0+incompatible
github.com/coreos/etcd v3.3.13+incompatible
github.com/go-openapi/runtime v0.19.11
github.com/go-openapi/strfmt v0.19.4
github.com/golang/protobuf v1.4.2 // indirect
github.com/google/uuid v1.1.1
github.com/googleapis/gnostic v0.4.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kelseyhightower/envconfig v1.4.0
github.com/kr/pretty v0.2.0 // indirect
github.com/logicmonitor/k8s-collectorset-controller v2.0.0+incompatible
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.6.1
github.com/vkumbhar94/lm-sdk-go v2.0.1+incompatible
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
google.golang.org/grpc v1.27.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
k8s.io/api v0.17.0
k8s.io/apimachinery v0.17.0
k8s.io/client-go v0.17.0
)
474 changes: 474 additions & 0 deletions go.sum

Large diffs are not rendered by default.

75 changes: 7 additions & 68 deletions pkg/argus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/logicmonitor/k8s-argus/pkg/etcd"
"github.com/logicmonitor/k8s-argus/pkg/facade"
"github.com/logicmonitor/k8s-argus/pkg/lmexec"
lmlog "github.com/logicmonitor/k8s-argus/pkg/log"
"github.com/logicmonitor/k8s-argus/pkg/sync"
"github.com/logicmonitor/k8s-argus/pkg/tree"
"github.com/logicmonitor/k8s-argus/pkg/types"
Expand All @@ -24,9 +25,9 @@ import (
"github.com/logicmonitor/k8s-argus/pkg/watch/pod"
"github.com/logicmonitor/k8s-argus/pkg/watch/service"
"github.com/logicmonitor/k8s-argus/pkg/worker"
"github.com/logicmonitor/lm-sdk-go/client"
"github.com/logicmonitor/lm-sdk-go/client/lm"
log "github.com/sirupsen/logrus"
"github.com/vkumbhar94/lm-sdk-go/client"
"github.com/vkumbhar94/lm-sdk-go/client/lm"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -217,7 +218,10 @@ func NewArgus(base *types.Base) (*Argus, error) {
initSyncer := sync.InitSyncer{
DeviceManager: deviceManager,
}
initSyncer.InitSync()

lctx := lmlog.NewLMContextWith(log.WithFields(log.Fields{"name": "init-sync"}))
initSyncer.InitSync(lctx)
initSyncer.RunPeriodicSync(10)

if base.Config.EtcdDiscoveryToken != "" {
etcdController := etcd.Controller{
Expand All @@ -229,71 +233,6 @@ func NewArgus(base *types.Base) (*Argus, error) {
}
}
log.Debugf("Initialized argus")
// podChannel := make(chan types.ICommand)
// serviceChannel := make(chan types.ICommand)
// deploymentChannel := make(chan types.ICommand)
// nodeChannel := make(chan types.ICommand)
// argus.Watchers = []types.Watcher{
// &namespace.Watcher{
// Base: base,
// DeviceGroups: deviceGroups,
// },
// &node.Watcher{
// DeviceManager: deviceManager,
// DeviceGroups: deviceGroups,
// LMClient: base.LMClient,
// WConfig: types.WConfig{
// MethodChannels: map[string]chan types.ICommand{
// "GET": nodeChannel,
// "POST": nodeChannel,
// "DELETE": nodeChannel,
// "PUT": nodeChannel,
// "PATCH": nodeChannel,
// },
// RetryLimit: 2,
// },
// },
// &service.Watcher{
// DeviceManager: deviceManager,
// WConfig: types.WConfig{
// MethodChannels: map[string]chan types.ICommand{
// "GET": serviceChannel,
// "POST": serviceChannel,
// "DELETE": serviceChannel,
// "PUT": serviceChannel,
// "PATCH": serviceChannel,
// },
// RetryLimit: 2,
// },
// },
// &pod.Watcher{
// DeviceManager: deviceManager,
// WConfig: types.WConfig{
// MethodChannels: map[string]chan types.ICommand{
// "GET": podChannel,
// "POST": podChannel,
// "DELETE": podChannel,
// "PUT": podChannel,
// "PATCH": podChannel,
// },
// RetryLimit: 2,
// },
// },
// &deployment.Watcher{
// DeviceManager: deviceManager,
// WConfig: types.WConfig{
// MethodChannels: map[string]chan types.ICommand{
// "GET": deploymentChannel,
// "POST": deploymentChannel,
// "DELETE": deploymentChannel,
// "PUT": deploymentChannel,
// "PATCH": deploymentChannel,
// },
// RetryLimit: 2,
// },
// },
// }

return argus, nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func createGRPCConnection() (*grpc.ClientConn, error) {
select {
case <-timeout:
return nil, fmt.Errorf("timeout waiting for gRPC connection")
case <-ticker.C:
default:
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, appConfig.Address, grpc.WithBlock(), grpc.WithInsecure())
Expand All @@ -86,6 +86,7 @@ func createGRPCConnection() (*grpc.ClientConn, error) {
} else {
return conn, nil
}
<-ticker.C
}
}
}
Expand All @@ -101,12 +102,13 @@ func createCSCClient() (api.CollectorSetControllerClient, error) {
select {
case <-timeout:
return client, fmt.Errorf("timeout waiting for collectors to become available")
case <-ticker.C:
default:
healthCheckResponse := getCSCHealth(hc)
if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING {
return client, nil
}
log.Debugf("The collectors are not ready: %v", healthCheckResponse.GetStatus().String())
<-ticker.C
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
K8sDeviceType = 8
// K8sSystemCategoriesPropertyKey is the key of the unique custom property kubernetes system categories
K8sSystemCategoriesPropertyKey = "system.categories"
// K8sSystemIPsPropertyKey is the key of the system ips property
K8sSystemIPsPropertyKey = "system.ips"
)

const (
Expand All @@ -115,3 +117,29 @@ const (
// Nodes Nodes generic
Nodes = "nodes"
)

const (
// HelmChart is the key for Argus & Collectoeset-controller label
HelmChart = "helm-chart"
// HelmRevision is the key for Argus & Collectoeset-controller label
HelmRevision = "helm-revision"
// Chart is the label key in Argus & Collectoeset-controller Deployment
Chart = "chart"
// Argus is the Argus Deployment label
Argus = "argus"
// CollectorsetController is the Collectorset-controller Deployment label
CollectorsetController = "collectorset-controller"
// KubernetesVersionKey is the key for customProperties
KubernetesVersionKey = "kubernetes.version"
// DeviceGroupCustomType is the device group of custom type
DeviceGroupCustomType = "custom"
// HistorySuffix is the key suffix used for maintaining history
HistorySuffix = ".history"
// PropertySeparator is the property separator
PropertySeparator = ", "
)

const (
// IsPingDevice is the key used in watcher context to pass metadata
IsPingDevice = "ispingdevice"
)
28 changes: 28 additions & 0 deletions pkg/cronjob/cronjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cronjob

import (
"github.com/logicmonitor/k8s-argus/pkg/lmctx"
lmlog "github.com/logicmonitor/k8s-argus/pkg/log"
"github.com/robfig/cron/v3"
)

var (
cj *cron.Cron
)

func init() {
cj = cron.New()
// Start the cron scheduler in its own goroutine, or no-op if already started.
cj.Start()
}

// RegisterFunc adds a func to the Cron to be run on the given schedule.
func RegisterFunc(lctx *lmctx.LMContext, cronSpec string, handlerFunc func()) cron.EntryID {
log := lmlog.Logger(lctx)
entryID, err := cj.AddFunc(cronSpec, handlerFunc)
if err != nil {
log.Errorf("Failed to add a func to the cron. Error: %v", err)
return 0
}
return entryID
}
138 changes: 138 additions & 0 deletions pkg/cronjob/devicegroupcron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package cronjob

import (
"strings"

"github.com/logicmonitor/k8s-argus/pkg/constants"
"github.com/logicmonitor/k8s-argus/pkg/devicegroup"
"github.com/logicmonitor/k8s-argus/pkg/lmctx"
lmlog "github.com/logicmonitor/k8s-argus/pkg/log"
"github.com/logicmonitor/k8s-argus/pkg/types"
"github.com/logicmonitor/k8s-argus/pkg/watch/namespace"
"github.com/sirupsen/logrus"
"github.com/vkumbhar94/lm-sdk-go/client"
"github.com/vkumbhar94/lm-sdk-go/models"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// UpdateTelemetryCron a cron job to update K8s & Helm properties in cluster device group
func UpdateTelemetryCron(base *types.Base) {
lctx := lmlog.NewLMContextWith(logrus.WithFields(logrus.Fields{"res": "update-telemetry"}))
RegisterFunc(lctx, "@midnight", func() { updateTelemetry(lctx, base) })
}

func updateTelemetry(lctx *lmctx.LMContext, base *types.Base) {
log := lmlog.Logger(lctx)
parentID := base.Config.ClusterGroupID
groupName := constants.ClusterDeviceGroupPrefix + base.Config.ClusterName
deviceGroup, err := devicegroup.Find(parentID, groupName, base.LMClient)
if err != nil || deviceGroup == nil {
log.Errorf("Failed to fetch device group. Error: %v", err)
return
}
updateDeviceGroupK8sAndHelmProperties(lctx, deviceGroup.ID, base.LMClient, base.K8sClient)
}

// updateDeviceGroupK8sAndHelmProperties will fetch existing properties and compare with actual values then update in cluster device group
func updateDeviceGroupK8sAndHelmProperties(lctx *lmctx.LMContext, groupID int32, client *client.LMSdkGo, kubeClient kubernetes.Interface) {
existingPropertiesMap := getExistingDeviceGroupPropertiesMap(lctx, groupID, client)
customPropertiesMap := getK8sAndHelmProperties(lctx, kubeClient)

for k, v := range customPropertiesMap {
// update history property
historyKey := k + constants.HistorySuffix
updatedHistoryVal := getUpdatedHistoryValue(existingPropertiesMap[historyKey], v)
updateProperty(lctx, historyKey, updatedHistoryVal, groupID, client)

// update latest property
updateProperty(lctx, k, v, groupID, client)
}
}

func getExistingDeviceGroupPropertiesMap(lctx *lmctx.LMContext, groupID int32, client *client.LMSdkGo) map[string]string {
entityProperties := devicegroup.GetDeviceGroupPropertyList(lctx, groupID, client)
entityPropertiesMap := make(map[string]string)
for _, property := range entityProperties {
entityPropertiesMap[property.Name] = property.Value
}
return entityPropertiesMap
}

func getK8sAndHelmProperties(lctx *lmctx.LMContext, kubeClient kubernetes.Interface) map[string]string {
customProperties := make(map[string]string)
customProperties = getKubernetesVersion(lctx, customProperties, kubeClient)
customProperties = getHelmChartDetailsFromConfigMap(lctx, customProperties, kubeClient)
return customProperties
}

// getKubernetesVersion Fetches Kubernetes version
func getKubernetesVersion(lctx *lmctx.LMContext, customProperties map[string]string, kubeClient kubernetes.Interface) map[string]string {
log := lmlog.Logger(lctx)
serverVersion, err := kubeClient.Discovery().ServerVersion()
if err != nil || serverVersion == nil {
log.Errorf("Failed to get Kubernetes version. Error: %v", err)
return customProperties
}
cpValue := serverVersion.String()
customProperties[constants.KubernetesVersionKey] = cpValue
return customProperties
}

// getHelmChartDetailsFromConfigMap fetches configmap from kubernetes cluster and read annotations
func getHelmChartDetailsFromConfigMap(lctx *lmctx.LMContext, customProperties map[string]string, kubeClient kubernetes.Interface) map[string]string {
log := lmlog.Logger(lctx)

// get list of namespace for fetching deployments
namespaceList := namespace.GetNamespaceList(lctx, kubeClient)

regex := constants.Chart + " in (" + constants.Argus + ", " + constants.CollectorsetController + ")"
opts := metav1.ListOptions{
LabelSelector: regex,
}
for i := range namespaceList {
configMapList, err := kubeClient.CoreV1().ConfigMaps(namespaceList[i]).List(opts)
if err != nil || configMapList == nil {
log.Errorf("Failed to get the configMap from k8s. Error: %v", err)
continue
}
for i := range configMapList.Items {
annotations := configMapList.Items[i].GetAnnotations()
labelVal := configMapList.Items[i].GetLabels()[constants.Chart]
for key, value := range annotations {
if key == constants.HelmChart || key == constants.HelmRevision {
name := labelVal + "." + key
customProperties[name] = value
}
}
}
}
return customProperties
}

// update the property and if it does not exists then add it
func updateProperty(lctx *lmctx.LMContext, key string, value string, groupID int32, client *client.LMSdkGo) {
entityProperty := models.EntityProperty{Name: key, Value: value, Type: constants.DeviceGroupCustomType}
isUpdated := devicegroup.UpdateDeviceGroupPropertyByName(lctx, groupID, &entityProperty, client)
if !isUpdated {
devicegroup.AddDeviceGroupProperty(lctx, groupID, &entityProperty, client)
}
}

func getUpdatedHistoryValue(historyVal, newValue string) string {
values := []string{}
if historyVal != "" {
values = strings.Split(historyVal, constants.PropertySeparator)
}
length := len(values)
// append record if not same as last
if length == 0 || values[length-1] != newValue {
values = append(values, newValue)
length = len(values) // calculate length again after adding record
}
// retain last 10 records
if length >= 10 {
values = values[length-10 : length]
}
return strings.Join(values, constants.PropertySeparator)
}
Loading