Skip to content
This repository was archived by the owner on Jan 16, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
08e2f92
Stash to GitHub (#92)
JeremyTangCD Dec 21, 2018
68b1557
Dev-40217 support to set log levels and improve (#94)
JeremyTangCD Jan 2, 2019
2582900
bump version
Jan 2, 2019
73338c7
Remove beta tag (#97)
woz5999 Jan 7, 2019
0bd4050
Dev 42060 improve the initSync and the branch of the lm sdk (#98)
JeremyTangCD Jan 17, 2019
8f7491d
Merge branch 'master' into develop
woz5999 Jan 17, 2019
62c7a05
Dev 49046 fix the internal ip cannot found (#102)
JeremyTangCD Feb 18, 2019
261d044
Merge branch 'master' into develop
woz5999 Feb 21, 2019
da40fa7
DEV-48974 Upgrade the go version of the argus from 1.9 to 1.11 (#104)
JeremyTangCD Feb 28, 2019
2d6af21
Merge branch 'master' into develop
woz5999 Mar 28, 2019
46b3ac7
Develop (#108)
DzXiaoLMCD May 20, 2019
5c21dbd
Merge branch 'master' into develop
woz5999 May 22, 2019
34b0bf1
Add deployment and device uptime support (#110)
DzXiaoLMCD Jun 19, 2019
654d92c
Merge branch 'master' into develop
woz5999 Jun 26, 2019
cedf4e8
Develop (#112)
DzXiaoLMCD Jul 3, 2019
8db30a4
Merge branch 'master' into develop
Jul 11, 2019
0af387d
Develop (#114)
DzXiaoLMCD Jul 29, 2019
648acbc
Dev 55114 argus shouldn t overwrite system.categories values (#116)
JeremyTangCD Dec 13, 2019
d65944a
Merge branch 'master' into develop
Dec 13, 2019
72fc2d2
Adding support for k8s 1.167+ (#124)
vkumbhar94 Apr 21, 2020
0a19c59
Merge branch 'master' into develop
vkumbhar94 Apr 21, 2020
f27fb33
Milestone 3.0.1 Changes (#135)
vkumbhar94 Jun 4, 2020
a1946a8
Merge branch 'master' into develop
vkumbhar94 Jun 4, 2020
a1f384a
Gosec fixes (#138)
vkumbhar94 Jun 30, 2020
7165c1e
Merge branch 'master' into develop
vkumbhar94 Jul 1, 2020
c7721ca
Argus 3.1.0 dev (#140)
vkumbhar94 Aug 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 11 additions & 75 deletions cmd/watch.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package cmd

import (
"context"
"fmt"
"net/http"
"os"
"time"

argus "github.com/logicmonitor/k8s-argus/pkg"
"github.com/logicmonitor/k8s-argus/pkg/config"
"github.com/logicmonitor/k8s-argus/pkg/connection"
"github.com/logicmonitor/k8s-argus/pkg/constants"
"github.com/logicmonitor/k8s-argus/pkg/healthz"
lmlog "github.com/logicmonitor/k8s-argus/pkg/log"
"github.com/logicmonitor/k8s-argus/pkg/permission"
"github.com/logicmonitor/k8s-collectorset-controller/api"
collectorsetconstants "github.com/logicmonitor/k8s-collectorset-controller/pkg/constants"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// watchCmd represents the watch command
Expand All @@ -43,6 +38,10 @@ var watchCmd = &cobra.Command{
log.SetLevel(log.DebugLevel)
}

// Add hook to log pod id in log context
hook := &lmlog.DefaultFieldHook{}
log.AddHook(hook)

// Instantiate the base struct.
base, err := argus.NewBase(config)
if err != nil {
Expand All @@ -52,19 +51,13 @@ var watchCmd = &cobra.Command{
// Init the permission component
permission.Init(base.K8sClient)

// Set up a gRPC connection to the collectorset controller.
conn, err := grpc.Dial(config.Address, grpc.WithInsecure())
if err != nil {
log.Fatal(err.Error())
}
defer conn.Close() // nolint: errcheck
client, err := waitForCollectorSetClient(conn)
if err != nil {
log.Fatal(err.Error())
}
// Set up a gRPC connection and CSC Client.
connection.Initialize(config)

connection.CreateConnectionHandler()

// Instantiate the application and add watchers.
argus, err := argus.NewArgus(base, client)
argus, err := argus.NewArgus(base)
if err != nil {
log.Fatal(err.Error())
}
Expand All @@ -79,63 +72,6 @@ var watchCmd = &cobra.Command{
},
}

func waitForCollectorSetClient(conn *grpc.ClientConn) (api.CollectorSetControllerClient, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

state := conn.GetState()
// Wait for connection to be Ready.
for ; state != connectivity.Ready && conn.WaitForStateChange(ctx, state); state = conn.GetState() {
log.Infof("Waiting for gRPC")
}
if state != connectivity.Ready {
log.Fatalf("Failed waiting for gRPC to ready, state is %q", state)
}

log.Infof("State of gRPC is %q", state)

client := api.NewCollectorSetControllerClient(conn)

ready, err := pollCollectorSetStatus(conn)
if err != nil {
log.Fatal(err.Error())
}

if !ready {
log.Fatalf("The collectorset controller does not have any ready collectors")
}
log.Infof("The collectorset controller has available collectors")

return client, nil
}

func pollCollectorSetStatus(conn *grpc.ClientConn) (bool, error) {
timeout := time.After(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-timeout:
return false, fmt.Errorf("timeout waiting for collectors to become available")
case <-ticker.C:
log.Debugf("Checking collectors status")
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: collectorsetconstants.HealthServerServiceName,
}
hc := healthpb.NewHealthClient(conn)
healthCheckResponse, err := hc.Check(ctx, req)
if err != nil {
log.Errorf("Failed to get health check: %v", err)
}
if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING {
return true, nil
}
log.Debugf("The collectors are not ready: %d", healthCheckResponse.GetStatus())
}
}
}

func init() {
RootCmd.AddCommand(watchCmd)
}
11 changes: 7 additions & 4 deletions pkg/argus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/logicmonitor/k8s-argus/pkg/config"
"github.com/logicmonitor/k8s-argus/pkg/constants"
"github.com/logicmonitor/k8s-argus/pkg/device"
"github.com/logicmonitor/k8s-argus/pkg/devicecache"
"github.com/logicmonitor/k8s-argus/pkg/devicegroup"
"github.com/logicmonitor/k8s-argus/pkg/etcd"
"github.com/logicmonitor/k8s-argus/pkg/sync"
Expand All @@ -20,7 +21,6 @@ import (
"github.com/logicmonitor/k8s-argus/pkg/watch/node"
"github.com/logicmonitor/k8s-argus/pkg/watch/pod"
"github.com/logicmonitor/k8s-argus/pkg/watch/service"
"github.com/logicmonitor/k8s-collectorset-controller/api"
"github.com/logicmonitor/lm-sdk-go/client"
"github.com/logicmonitor/lm-sdk-go/client/lm"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -93,14 +93,17 @@ func newK8sClient() (*kubernetes.Clientset, error) {
}

// NewArgus instantiates and returns argus.
func NewArgus(base *types.Base, client api.CollectorSetControllerClient) (*Argus, error) {
func NewArgus(base *types.Base) (*Argus, error) {
argus := &Argus{
Base: base,
}

dcache := devicecache.NewDeviceCache(base, 5)
dcache.Run()

deviceManager := &device.Manager{
Base: base,
ControllerClient: client,
Base: base,
DC: dcache,
}

deviceTree := &tree.DeviceTree{
Expand Down
145 changes: 145 additions & 0 deletions pkg/connection/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package connection

import (
"context"
"fmt"
"sync"
"time"

"github.com/logicmonitor/k8s-argus/pkg/config"
"github.com/logicmonitor/k8s-collectorset-controller/api"
collectorsetconstants "github.com/logicmonitor/k8s-collectorset-controller/pkg/constants"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

var (
grpcConn *grpc.ClientConn
cscClient api.CollectorSetControllerClient
connLock sync.RWMutex
appConfig *config.Config
)

// Initialize - it will initialize gRPC connection & csc client
func Initialize(config *config.Config) {
log.Info("Initializing gRPC connection & CSC Client.")
appConfig = config
createConnection()
}

func createConnection() {
conn, grpcErr := createGRPCConnection()
if grpcErr != nil {
log.Errorf("Error while creating gRPC connection. Error: %v", grpcErr.Error())
return
}
setGRPCConn(conn)

client, cscErr := createCSCClient()
if cscErr != nil {
log.Errorf("Error while creating gRPC connection. Error: %v", cscErr.Error())
return
}
setCSCClient(client)
}

func setGRPCConn(conn *grpc.ClientConn) {
connLock.Lock()
defer connLock.Unlock()
grpcConn = conn
}

func getGRPCConn() *grpc.ClientConn {
connLock.RLock()
defer connLock.RUnlock()
return grpcConn
}

func setCSCClient(csc api.CollectorSetControllerClient) {
connLock.Lock()
defer connLock.Unlock()
cscClient = csc
}

// GetCSCClient - returns CSC client
func GetCSCClient() api.CollectorSetControllerClient {
connLock.RLock()
defer connLock.RUnlock()
return cscClient
}

func createGRPCConnection() (*grpc.ClientConn, error) {
timeout := time.After(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-timeout:
return nil, fmt.Errorf("timeout waiting for gRPC connection")
case <-ticker.C:
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10)*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx, appConfig.Address, grpc.WithBlock(), grpc.WithInsecure())
if err != nil {
log.Errorf("Error while creating gRPC connection. Error: %v", err.Error())
} else {
return conn, nil
}
}
}
}

func createCSCClient() (api.CollectorSetControllerClient, error) {
conn := getGRPCConn()
client := api.NewCollectorSetControllerClient(conn)

timeout := time.After(10 * time.Minute)
ticker := time.NewTicker(10 * time.Second)
hc := healthpb.NewHealthClient(conn)
for {
select {
case <-timeout:
return client, fmt.Errorf("timeout waiting for collectors to become available")
case <-ticker.C:
healthCheckResponse := getCSCHealth(hc)
if healthCheckResponse.GetStatus() == healthpb.HealthCheckResponse_SERVING {
return client, nil
}
log.Debugf("The collectors are not ready: %v", healthCheckResponse.GetStatus().String())
}
}
}

func getCSCHealth(hc healthpb.HealthClient) *healthpb.HealthCheckResponse {
log.Debug("Checking collectors status")
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
req := &healthpb.HealthCheckRequest{
Service: collectorsetconstants.HealthServerServiceName,
}
healthCheckResponse, err := hc.Check(ctx, req)
if err != nil {
log.Errorf("Failed to get health check: %v", err)
}
return healthCheckResponse
}

// CreateConnectionHandler - It will create a go routine for handling gRPC connection creation
func CreateConnectionHandler() {
go func() {
for {
time.Sleep(time.Duration(10) * time.Second)
checkGRPCState()
}
}()
}

// checkGRPCState - It will check gRPC state & call createConnection if required
func checkGRPCState() {
state := getGRPCConn().GetState()
if state == connectivity.Shutdown {
log.Infof("gRPC is in \"%v\" state. Creating new gRPC connection & CSC client.", state.String())
createConnection()
}
}
Loading