Skip to content
This repository was archived by the owner on Jan 16, 2023. It is now read-only.

Commit 3944266

Browse files
committed
DEV-50929 Add k8s deployments monitoring by Argus
2 parents 3d9407c + 0e9d209 commit 3944266

File tree

10 files changed

+226
-11
lines changed

10 files changed

+226
-11
lines changed

pkg/argus.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package argus
22

33
import (
4+
"github.com/logicmonitor/k8s-argus/pkg/watch/deployment"
45
"time"
56

67
"github.com/logicmonitor/k8s-argus/pkg/config"
@@ -110,6 +111,9 @@ func NewArgus(base *types.Base, client api.CollectorSetControllerClient) (*Argus
110111
&pod.Watcher{
111112
DeviceManager: deviceManager,
112113
},
114+
&deployment.Watcher{
115+
DeviceManager: deviceManager,
116+
},
113117
}
114118

115119
return argus, nil
@@ -140,9 +144,8 @@ func NewBase(config *config.Config) (*types.Base, error) {
140144

141145
// Watch watches the API for events.
142146
func (a *Argus) Watch() {
143-
getter := a.K8sClient.CoreV1().RESTClient()
144147
for _, w := range a.Watchers {
145-
watchlist := cache.NewListWatchFromClient(getter, w.Resource(), v1.NamespaceAll, fields.Everything())
148+
watchlist := cache.NewListWatchFromClient(getK8sRESTClient(a.K8sClient, w.ApiVersion()), w.Resource(), v1.NamespaceAll, fields.Everything())
146149
_, controller := cache.NewInformer(
147150
watchlist,
148151
w.ObjType(),
@@ -158,6 +161,18 @@ func (a *Argus) Watch() {
158161
}
159162
}
160163

164+
// get the K8s RESTClient by apiVersion, use the default V1 version if there is no match
165+
func getK8sRESTClient(clientset *kubernetes.Clientset, apiVersion string) rest.Interface {
166+
switch apiVersion {
167+
case constants.K8sApiVersion_v1:
168+
return clientset.CoreV1().RESTClient()
169+
case constants.K8sApiVersion_apps_v1beta2:
170+
return clientset.AppsV1beta2().RESTClient()
171+
default:
172+
return clientset.CoreV1().RESTClient()
173+
}
174+
}
175+
161176
// check the cluster group ID, if the group does not exist, just use the root group
162177
func checkAndUpdateClusterGroup(config *config.Config, lmClient *client.LMSdkGo) {
163178
// do not need to check the root group

pkg/constants/constants.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ const (
3737
PodDeviceGroupName = "Pods"
3838
// ServiceDeviceGroupName is the service device group name in the cluster device group.
3939
ServiceDeviceGroupName = "Services"
40+
// DeploymentDeviceGroupName is the deployment device group name in the cluster device group.
41+
DeploymentDeviceGroupName = "Deployments"
4042
)
4143

4244
const (
@@ -54,6 +56,10 @@ const (
5456
ServiceCategory = "KubernetesService"
5557
// ServiceDeletedCategory is the system.category used to identity a deleted Kubernetes Service resource type in LogicMonitor.
5658
ServiceDeletedCategory = "KubernetesServiceDeleted"
59+
// DeploymentCategory is the system.category used to identity a Kubernetes Service resource type in LogicMonitor.
60+
DeploymentCategory = "KubernetesDeployment"
61+
// DeploymentDeletedCategory is the system.category used to identity a deleted Kubernetes Service resource type in LogicMonitor.
62+
DeploymentDeletedCategory = "KubernetesDeploymentDeleted"
5763
// PodCategory is the system.category used to identity the Kubernetes Pod resource type in LogicMonitor.
5864
PodCategory = "KubernetesPod"
5965
// PodDeletedCategory is the system.category used to identity a deleted Kubernetes Pod resource type in LogicMonitor.
@@ -81,3 +87,12 @@ const (
8187
// K8sDeviceType is the type value of the k8s device
8288
K8sDeviceType = 8
8389
)
90+
91+
const (
92+
// K8sApiVersion_v1 is the version 'v1' of k8s api
93+
K8sApiVersion_v1 = "v1"
94+
// K8sApiVersion_apps_v1beta1 is the version 'apps/v1beta1' of k8s api
95+
K8sApiVersion_apps_v1beta1 = "apps/v1beta1"
96+
// K8sApiVersion_apps_v1beta2 is the version 'apps/v1beta2' of k8s api
97+
K8sApiVersion_apps_v1beta2 = "apps/v1beta2"
98+
)

pkg/sync/initsyncer.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/logicmonitor/k8s-argus/pkg/constants"
77
"github.com/logicmonitor/k8s-argus/pkg/device"
88
"github.com/logicmonitor/k8s-argus/pkg/devicegroup"
9+
"github.com/logicmonitor/k8s-argus/pkg/watch/deployment"
910
"github.com/logicmonitor/k8s-argus/pkg/watch/node"
1011
"github.com/logicmonitor/k8s-argus/pkg/watch/pod"
1112
"github.com/logicmonitor/k8s-argus/pkg/watch/service"
@@ -31,7 +32,7 @@ func (i *InitSyncer) InitSync() {
3132
return
3233
}
3334

34-
// get the node, pod, service info
35+
// get the node, pod, service, deployment info
3536
if rest.SubGroups != nil && len(rest.SubGroups) != 0 {
3637
wg := sync.WaitGroup{}
3738
wg.Add(len(rest.SubGroups))
@@ -46,15 +47,21 @@ func (i *InitSyncer) InitSync() {
4647
case constants.PodDeviceGroupName:
4748
go func() {
4849
defer wg.Done()
49-
i.initSyncPodsOrServices(constants.PodDeviceGroupName, rest.ID)
50+
i.initSyncPodsOrServicesOrDeploys(constants.PodDeviceGroupName, rest.ID)
5051
log.Infof("Finish syncing %v", constants.PodDeviceGroupName)
5152
}()
5253
case constants.ServiceDeviceGroupName:
5354
go func() {
5455
defer wg.Done()
55-
i.initSyncPodsOrServices(constants.ServiceDeviceGroupName, rest.ID)
56+
i.initSyncPodsOrServicesOrDeploys(constants.ServiceDeviceGroupName, rest.ID)
5657
log.Infof("Finish syncing %v", constants.ServiceDeviceGroupName)
5758
}()
59+
case constants.DeploymentDeviceGroupName:
60+
go func() {
61+
defer wg.Done()
62+
i.initSyncPodsOrServicesOrDeploys(constants.DeploymentDeviceGroupName, rest.ID)
63+
log.Infof("Finish syncing %v", constants.DeploymentDeviceGroupName)
64+
}()
5865
default:
5966
func() {
6067
defer wg.Done()
@@ -96,7 +103,7 @@ func (i *InitSyncer) intSyncNodes(parentGroupID int32) {
96103
}
97104
}
98105

99-
func (i *InitSyncer) initSyncPodsOrServices(deviceType string, parentGroupID int32) {
106+
func (i *InitSyncer) initSyncPodsOrServicesOrDeploys(deviceType string, parentGroupID int32) {
100107
rest, err := devicegroup.Find(parentGroupID, deviceType, i.DeviceManager.LMClient)
101108
if err != nil || rest == nil {
102109
log.Warnf("Failed to get the %s group", deviceType)
@@ -114,6 +121,8 @@ func (i *InitSyncer) initSyncPodsOrServices(deviceType string, parentGroupID int
114121
deviceMap, err = pod.GetPodsMap(i.DeviceManager.K8sClient, subGroup.Name)
115122
} else if deviceType == constants.ServiceDeviceGroupName {
116123
deviceMap, err = service.GetServicesMap(i.DeviceManager.K8sClient, subGroup.Name)
124+
} else if deviceType == constants.DeploymentDeviceGroupName {
125+
deviceMap, err = deployment.GetDeploymentsMap(i.DeviceManager.K8sClient, subGroup.Name)
117126
} else {
118127
return
119128
}

pkg/tree/tree.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ func (d *DeviceTree) buildOptsSlice() []*devicegroup.Options {
6464
DeleteDevices: d.Config.DeleteDevices,
6565
AppliesToDeletedGroup: devicegroup.NewAppliesToBuilder().HasCategory(constants.PodDeletedCategory).And().Auto("clustername").Equals(d.Config.ClusterName),
6666
},
67+
{
68+
Name: constants.DeploymentDeviceGroupName,
69+
DisableAlerting: true,
70+
AppliesTo: devicegroup.NewAppliesToBuilder(),
71+
Client: d.LMClient,
72+
DeleteDevices: d.Config.DeleteDevices,
73+
AppliesToDeletedGroup: devicegroup.NewAppliesToBuilder().HasCategory(constants.DeploymentDeletedCategory).And().Auto("clustername").Equals(d.Config.ClusterName),
74+
},
6775
}
6876
}
6977

pkg/types/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Base struct {
1717

1818
// Watcher is the LogicMonitor Watcher interface.
1919
type Watcher interface {
20+
ApiVersion() string
2021
Resource() string
2122
ObjType() runtime.Object
2223
AddFunc() func(obj interface{})

pkg/watch/deployment/deployment.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Package deployment provides the logic for mapping a Kubernetes deployment to a
2+
// LogicMonitor w.
3+
package deployment
4+
5+
import (
6+
"fmt"
7+
8+
"github.com/logicmonitor/k8s-argus/pkg/constants"
9+
"github.com/logicmonitor/k8s-argus/pkg/types"
10+
"github.com/logicmonitor/k8s-argus/pkg/utilities"
11+
log "github.com/sirupsen/logrus"
12+
"k8s.io/api/apps/v1beta2"
13+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/runtime"
15+
"k8s.io/client-go/kubernetes"
16+
)
17+
18+
const (
19+
resource = "deployments"
20+
)
21+
22+
// Watcher represents a watcher type that watches deployments.
23+
type Watcher struct {
24+
types.DeviceManager
25+
}
26+
27+
// ApiVersion is a function that implements the Watcher interface.
28+
func (w *Watcher) ApiVersion() string {
29+
return constants.K8sApiVersion_apps_v1beta2
30+
}
31+
32+
// Resource is a function that implements the Watcher interface.
33+
func (w *Watcher) Resource() string {
34+
return resource
35+
}
36+
37+
// ObjType is a function that implements the Watcher interface.
38+
func (w *Watcher) ObjType() runtime.Object {
39+
return &v1beta2.Deployment{}
40+
}
41+
42+
// AddFunc is a function that implements the Watcher interface.
43+
func (w *Watcher) AddFunc() func(obj interface{}) {
44+
return func(obj interface{}) {
45+
deployment := obj.(*v1beta2.Deployment)
46+
47+
log.Infof("Handling add deployment event: %s", deployment.Name)
48+
49+
w.add(deployment)
50+
}
51+
}
52+
53+
// UpdateFunc is a function that implements the Watcher interface.
54+
func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) {
55+
return func(oldObj, newObj interface{}) {
56+
old := oldObj.(*v1beta2.Deployment)
57+
new := newObj.(*v1beta2.Deployment)
58+
59+
w.update(old, new)
60+
}
61+
}
62+
63+
// DeleteFunc is a function that implements the Watcher interface.
64+
func (w *Watcher) DeleteFunc() func(obj interface{}) {
65+
return func(obj interface{}) {
66+
deployment := obj.(*v1beta2.Deployment)
67+
68+
// Delete the deployment.
69+
if w.Config().DeleteDevices {
70+
if err := w.DeleteByDisplayName(fmtDeploymentDisplayName(deployment)); err != nil {
71+
log.Errorf("Failed to delete deployment: %v", err)
72+
return
73+
}
74+
log.Infof("Deleted deployment %s", deployment.Name)
75+
return
76+
}
77+
78+
// Move the deployment.
79+
w.move(deployment)
80+
}
81+
}
82+
83+
// nolint: dupl
84+
func (w *Watcher) add(deployment *v1beta2.Deployment) {
85+
if _, err := w.Add(
86+
w.args(deployment, constants.DeploymentCategory)...,
87+
); err != nil {
88+
log.Errorf("Failed to add deployment %q: %v", fmtDeploymentDisplayName(deployment), err)
89+
return
90+
}
91+
log.Infof("Added deployment %q", fmtDeploymentDisplayName(deployment))
92+
}
93+
94+
func (w *Watcher) update(old, new *v1beta2.Deployment) {
95+
if _, err := w.UpdateAndReplaceByDisplayName(
96+
fmtDeploymentDisplayName(old),
97+
w.args(new, constants.DeploymentCategory)...,
98+
); err != nil {
99+
log.Errorf("Failed to update deployment %q: %v", fmtDeploymentDisplayName(new), err)
100+
return
101+
}
102+
log.Infof("Updated deployment %q", fmtDeploymentDisplayName(old))
103+
}
104+
105+
func (w *Watcher) move(deployment *v1beta2.Deployment) {
106+
if _, err := w.UpdateAndReplaceFieldByDisplayName(fmtDeploymentDisplayName(deployment), constants.CustomPropertiesFieldName, w.args(deployment, constants.DeploymentDeletedCategory)...); err != nil {
107+
log.Errorf("Failed to move deployment %q: %v", fmtDeploymentDisplayName(deployment), err)
108+
return
109+
}
110+
log.Infof("Moved deployment %q", fmtDeploymentDisplayName(deployment))
111+
}
112+
113+
func (w *Watcher) args(deployment *v1beta2.Deployment, category string) []types.DeviceOption {
114+
categories := utilities.BuildSystemCategoriesFromLabels(category, deployment.Labels)
115+
return []types.DeviceOption{
116+
w.Name(deployment.Name),
117+
w.ResourceLabels(deployment.Labels),
118+
w.DisplayName(fmtDeploymentDisplayName(deployment)),
119+
w.SystemCategories(categories),
120+
w.Auto("name", deployment.Name),
121+
w.Auto("namespace", deployment.Namespace),
122+
w.Auto("selflink", deployment.SelfLink),
123+
w.Auto("uid", string(deployment.UID)),
124+
}
125+
}
126+
127+
// fmtDeploymentDisplayName implements the conversion for the deployment display name
128+
func fmtDeploymentDisplayName(deployment *v1beta2.Deployment) string {
129+
return fmt.Sprintf("%s.%s.deploy-%s", deployment.Name, deployment.Namespace, string(deployment.UID))
130+
}
131+
132+
// GetDeploymentsMap implements the getting deployments map info from k8s
133+
func GetDeploymentsMap(k8sClient *kubernetes.Clientset, namespace string) (map[string]string, error) {
134+
deploymentsMap := make(map[string]string)
135+
deploymentList, err := k8sClient.AppsV1beta2().Deployments(namespace).List(v1.ListOptions{})
136+
if err != nil || deploymentList == nil {
137+
log.Warnf("Failed to get the deployments from k8s")
138+
return nil, err
139+
}
140+
for _, deploymentInfo := range deploymentList.Items {
141+
deploymentsMap[fmtDeploymentDisplayName(&deploymentInfo)] = deploymentInfo.Name
142+
}
143+
144+
return deploymentsMap, nil
145+
}

pkg/watch/namespace/namespace.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,23 @@ type Watcher struct {
2020
DeviceGroups map[string]int32
2121
}
2222

23+
// ApiVersion is a function that implements the Watcher interface.
24+
func (w *Watcher) ApiVersion() string {
25+
return constants.K8sApiVersion_v1
26+
}
27+
2328
// Resource is a function that implements the Watcher interface.
24-
func (w Watcher) Resource() string {
29+
func (w *Watcher) Resource() string {
2530
return resource
2631
}
2732

2833
// ObjType is a function that implements the Watcher interface.
29-
func (w Watcher) ObjType() runtime.Object {
34+
func (w *Watcher) ObjType() runtime.Object {
3035
return &v1.Namespace{}
3136
}
3237

3338
// AddFunc is a function that implements the Watcher interface.
34-
func (w Watcher) AddFunc() func(obj interface{}) {
39+
func (w *Watcher) AddFunc() func(obj interface{}) {
3540
return func(obj interface{}) {
3641
namespace := obj.(*v1.Namespace)
3742
log.Debugf("Handling add namespace event: %s", namespace.Name)
@@ -43,6 +48,8 @@ func (w Watcher) AddFunc() func(obj interface{}) {
4348
appliesTo = devicegroup.NewAppliesToBuilder().HasCategory(constants.ServiceCategory).And().Auto("namespace").Equals(namespace.Name).And().Auto("clustername").Equals(w.Config.ClusterName)
4449
case constants.PodDeviceGroupName:
4550
appliesTo = devicegroup.NewAppliesToBuilder().HasCategory(constants.PodCategory).And().Auto("namespace").Equals(namespace.Name).And().Auto("clustername").Equals(w.Config.ClusterName)
51+
case constants.DeploymentDeviceGroupName:
52+
appliesTo = devicegroup.NewAppliesToBuilder().HasCategory(constants.DeploymentCategory).And().Auto("namespace").Equals(namespace.Name).And().Auto("clustername").Equals(w.Config.ClusterName)
4653
default:
4754
continue
4855
}
@@ -69,7 +76,7 @@ func (w Watcher) AddFunc() func(obj interface{}) {
6976
}
7077

7178
// UpdateFunc is a function that implements the Watcher interface.
72-
func (w Watcher) UpdateFunc() func(oldObj, newObj interface{}) {
79+
func (w *Watcher) UpdateFunc() func(oldObj, newObj interface{}) {
7380
return func(oldObj, newObj interface{}) {
7481
log.Debugf("Ignoring update namespace event")
7582
// oldNamespace := oldObj.(*v1.Namespace)
@@ -78,7 +85,7 @@ func (w Watcher) UpdateFunc() func(oldObj, newObj interface{}) {
7885
}
7986

8087
// DeleteFunc is a function that implements the Watcher interface.
81-
func (w Watcher) DeleteFunc() func(obj interface{}) {
88+
func (w *Watcher) DeleteFunc() func(obj interface{}) {
8289
return func(obj interface{}) {
8390
namespace := obj.(*v1.Namespace)
8491
log.Debugf("Handle deleting namespace event: %s", namespace.Name)

pkg/watch/node/node.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ type Watcher struct {
2828
LMClient *client.LMSdkGo
2929
}
3030

31+
// ApiVersion is a function that implements the Watcher interface.
32+
func (w *Watcher) ApiVersion() string {
33+
return constants.K8sApiVersion_v1
34+
}
35+
3136
// Resource is a function that implements the Watcher interface.
3237
func (w *Watcher) Resource() string {
3338
return resource

pkg/watch/pod/pod.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ type Watcher struct {
2222
types.DeviceManager
2323
}
2424

25+
// ApiVersion is a function that implements the Watcher interface.
26+
func (w *Watcher) ApiVersion() string {
27+
return constants.K8sApiVersion_v1
28+
}
29+
2530
// Resource is a function that implements the Watcher interface.
2631
func (w *Watcher) Resource() string {
2732
return resource

pkg/watch/service/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ type Watcher struct {
2424
types.DeviceManager
2525
}
2626

27+
// ApiVersion is a function that implements the Watcher interface.
28+
func (w *Watcher) ApiVersion() string {
29+
return constants.K8sApiVersion_v1
30+
}
31+
2732
// Resource is a function that implements the Watcher interface.
2833
func (w *Watcher) Resource() string {
2934
return resource

0 commit comments

Comments
 (0)