diff --git a/pkg/agent/dummy_data_gatherer.go b/pkg/agent/dummy_data_gatherer.go index 73ef6a16..970a69e0 100644 --- a/pkg/agent/dummy_data_gatherer.go +++ b/pkg/agent/dummy_data_gatherer.go @@ -29,12 +29,12 @@ type dummyDataGatherer struct { FailedAttempts int } -func (g *dummyDataGatherer) Run(stopCh <-chan struct{}) error { +func (g *dummyDataGatherer) Run(ctx context.Context) error { // no async functionality, see Fetch return nil } -func (g *dummyDataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error { +func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error { // no async functionality, see Fetch return nil } diff --git a/pkg/agent/run.go b/pkg/agent/run.go index ad9edfd6..bef72cbe 100644 --- a/pkg/agent/run.go +++ b/pkg/agent/run.go @@ -191,7 +191,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { // blocks until the supplied channel is closed. // For this reason, we must allow these errgroup Go routines to exit // without cancelling the other Go routines in the group. - if err := newDg.Run(gctx.Done()); err != nil { + if err := newDg.Run(gctx); err != nil { return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) } return nil @@ -220,7 +220,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) { // wait for the informer to complete an initial sync, we do this to // attempt to have an initial set of data for the first upload of // the run. - if err := dg.WaitForCacheSync(bootCtx.Done()); err != nil { + if err := dg.WaitForCacheSync(bootCtx); err != nil { // log sync failure, this might recover in future if errors.Is(err, k8s.ErrCacheSyncTimeout) { timedoutDGs = append(timedoutDGs, dgConfig.Name) diff --git a/pkg/datagatherer/datagatherer.go b/pkg/datagatherer/datagatherer.go index ec5b8b67..1a92c366 100644 --- a/pkg/datagatherer/datagatherer.go +++ b/pkg/datagatherer/datagatherer.go @@ -17,9 +17,9 @@ type DataGatherer interface { Fetch() (data interface{}, count int, err error) // Run starts the data gatherer's informers for resource collection. // Returns error if the data gatherer informer wasn't initialized - Run(stopCh <-chan struct{}) error + Run(ctx context.Context) error // WaitForCacheSync waits for the data gatherer's informers cache to sync. - WaitForCacheSync(stopCh <-chan struct{}) error + WaitForCacheSync(ctx context.Context) error // Delete, clear the cache of the DataGatherer if one is being used Delete() error } diff --git a/pkg/datagatherer/k8s/discovery.go b/pkg/datagatherer/k8s/discovery.go index 49ac5324..f072dcaf 100644 --- a/pkg/datagatherer/k8s/discovery.go +++ b/pkg/datagatherer/k8s/discovery.go @@ -47,12 +47,12 @@ type DataGathererDiscovery struct { cl *discovery.DiscoveryClient } -func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error { +func (g *DataGathererDiscovery) Run(ctx context.Context) error { // no async functionality, see Fetch return nil } -func (g *DataGathererDiscovery) WaitForCacheSync(stopCh <-chan struct{}) error { +func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error { // no async functionality, see Fetch return nil } diff --git a/pkg/datagatherer/k8s/dynamic.go b/pkg/datagatherer/k8s/dynamic.go index 810f9854..49b96278 100644 --- a/pkg/datagatherer/k8s/dynamic.go +++ b/pkg/datagatherer/k8s/dynamic.go @@ -181,7 +181,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami dgCache := cache.New(5*time.Minute, 30*time.Second) newDataGatherer := &DataGathererDynamic{ - ctx: ctx, groupVersionResource: c.GroupVersionResource, fieldSelector: fieldSelector.String(), namespaces: c.IncludeNamespaces, @@ -217,7 +216,7 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami newDataGatherer.informer = factory.ForResource(c.GroupVersionResource).Informer() } - registration, err := newDataGatherer.informer.AddEventHandler(k8scache.ResourceEventHandlerFuncs{ + registration, err := newDataGatherer.informer.AddEventHandlerWithOptions(k8scache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { onAdd(log, obj, dgCache) }, @@ -227,6 +226,8 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami DeleteFunc: func(obj interface{}) { onDelete(log, obj, dgCache) }, + }, k8scache.HandlerOptions{ + Logger: &log, }) if err != nil { return nil, err @@ -243,7 +244,6 @@ func (c *ConfigDynamic) newDataGathererWithClient(ctx context.Context, cl dynami // This is to allow us to support arbitrary CRDs and resources that Preflight // does not have registered as part of its `runtime.Scheme`. type DataGathererDynamic struct { - ctx context.Context // groupVersionResource is the name of the API group, version and resource // that should be fetched by this data gatherer. groupVersionResource schema.GroupVersionResource @@ -269,8 +269,8 @@ type DataGathererDynamic struct { // Run starts the dynamic data gatherer's informers for resource collection. // Returns error if the data gatherer informer wasn't initialized, Run blocks // until the stopCh is closed. -func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { - log := klog.FromContext(g.ctx) +func (g *DataGathererDynamic) Run(ctx context.Context) error { + log := klog.FromContext(ctx) if g.informer == nil { return fmt.Errorf("informer was not initialized, impossible to start") } @@ -288,7 +288,7 @@ func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error { } // start shared informer - g.informer.Run(stopCh) + g.informer.RunWithContext(ctx) return nil } @@ -298,8 +298,9 @@ var ErrCacheSyncTimeout = fmt.Errorf("timed out waiting for Kubernetes cache to // WaitForCacheSync waits for the data gatherer's informers cache to sync before // collecting the resources. Use errors.Is(err, ErrCacheSyncTimeout) to check if // the cache sync failed. -func (g *DataGathererDynamic) WaitForCacheSync(stopCh <-chan struct{}) error { - if !k8scache.WaitForCacheSync(stopCh, g.registration.HasSynced) { +func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error { + // Don't use WaitForNamedCacheSync, since we don't want to log extra messages. + if !k8scache.WaitForCacheSync(ctx.Done(), g.registration.HasSynced) { return ErrCacheSyncTimeout } diff --git a/pkg/datagatherer/k8s/dynamic_test.go b/pkg/datagatherer/k8s/dynamic_test.go index 58827a6a..5f954804 100644 --- a/pkg/datagatherer/k8s/dynamic_test.go +++ b/pkg/datagatherer/k8s/dynamic_test.go @@ -134,7 +134,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { } expected := &DataGathererDynamic{ - ctx: ctx, groupVersionResource: config.GroupVersionResource, // it's important that the namespaces are set as the IncludeNamespaces // during initialization @@ -144,9 +143,6 @@ func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) { gatherer := dg.(*DataGathererDynamic) // test gatherer's fields - if !reflect.DeepEqual(gatherer.ctx, expected.ctx) { - t.Errorf("expected %v, got %v", expected, dg) - } if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) { t.Errorf("expected %v, got %v", expected, dg) } @@ -180,7 +176,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) { } expected := &DataGathererDynamic{ - ctx: ctx, groupVersionResource: config.GroupVersionResource, // it's important that the namespaces are set as the IncludeNamespaces // during initialization @@ -189,9 +184,6 @@ func TestNewDataGathererWithClientAndSharedIndexInformer(t *testing.T) { gatherer := dg.(*DataGathererDynamic) // test gatherer's fields - if !reflect.DeepEqual(gatherer.ctx, expected.ctx) { - t.Errorf("expected %v, got %v", expected, dg) - } if !reflect.DeepEqual(gatherer.groupVersionResource, expected.groupVersionResource) { t.Errorf("expected %v, got %v", expected, dg) } @@ -693,11 +685,11 @@ func TestDynamicGatherer_Fetch(t *testing.T) { // start data gatherer informer dynamiDg := dg go func() { - if err = dynamiDg.Run(ctx.Done()); err != nil { + if err = dynamiDg.Run(ctx); err != nil { t.Errorf("unexpected client error: %+v", err) } }() - err = dynamiDg.WaitForCacheSync(ctx.Done()) + err = dynamiDg.WaitForCacheSync(ctx) if err != nil { t.Fatalf("unexpected client error: %+v", err) } @@ -1010,11 +1002,11 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) { // start data gatherer informer dynamiDg := dg go func() { - if err = dynamiDg.Run(ctx.Done()); err != nil { + if err = dynamiDg.Run(ctx); err != nil { t.Errorf("unexpected client error: %+v", err) } }() - err = dynamiDg.WaitForCacheSync(ctx.Done()) + err = dynamiDg.WaitForCacheSync(ctx) if err != nil { t.Fatalf("unexpected client error: %+v", err) } diff --git a/pkg/datagatherer/local/local.go b/pkg/datagatherer/local/local.go index c8957efc..a74dc6af 100644 --- a/pkg/datagatherer/local/local.go +++ b/pkg/datagatherer/local/local.go @@ -38,7 +38,7 @@ func (c *Config) NewDataGatherer(ctx context.Context) (datagatherer.DataGatherer }, nil } -func (g *DataGatherer) Run(stopCh <-chan struct{}) error { +func (g *DataGatherer) Run(ctx context.Context) error { // no async functionality, see Fetch return nil } @@ -48,7 +48,7 @@ func (g *DataGatherer) Delete() error { return nil } -func (g *DataGatherer) WaitForCacheSync(stopCh <-chan struct{}) error { +func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error { // no async functionality, see Fetch return nil }