Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,16 @@ func Run(cmd *cobra.Command, args []string) {
logs.Log.Fatalf("While evaluating configuration: %v", err)
}

go func() {
group, gctx := errgroup.WithContext(ctx)
defer func() {
// TODO: replace Fatalf log calls with Errorf and return the error
cancel()
if err := group.Wait(); err != nil {
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err)
}
}()
Comment on lines +77 to +84
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was cut and pasted earlier in the file so that the errgroup can be initialized and used for the management server (/metrics, /healthz, /debug/pprof) and for the VenafiConnection manager; both of which were previously run in unsupervised goroutines.


group.Go(func() error {
server := http.NewServeMux()

if Flags.Profiling {
Expand Down Expand Up @@ -105,21 +114,25 @@ func Run(cmd *cobra.Command, args []string) {

err := http.ListenAndServe(":8081", server)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
logs.Log.Fatalf("failed to run the health check server: %s", err)
return fmt.Errorf("failed to run the health check server: %s", err)
}
}()
// The agent must stop if the management server stops
cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling cancel, can we return an error when the server is stopped unexpectedly early?
Normally when the context is not canceled, the server should keep running, and when the context is canceled; we do not have to call cancel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, when we return an error, the group context will be automatically canceled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree, that an error should be returned.
Something like fmt.Errorf("the API server stopped unexpectedly").
But I'll come back to that in another PR.
This cancel will work for now.

return nil
})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a future PR we should add a signal handler to catch sigterm (ctrl-c) and cancel the parent context, and add another Go routine which calls server.Stop after the context is cancelled.

As it stands, sigterm will cause the process (all threads) to immediately exit without any graceful shutdown.


_, isVenConn := preflightClient.(*client.VenConnClient)
if isVenConn {
go func() {
err := preflightClient.(manager.Runnable).Start(ctx)
group.Go(func() error {
err := preflightClient.(manager.Runnable).Start(gctx)
if err != nil {
logs.Log.Fatalf("failed to start a controller-runtime component: %v", err)
return fmt.Errorf("failed to start a controller-runtime component: %v", err)
}

// The agent must stop if the controller-runtime component stops.
cancel()
}()
return nil
})
}

// To help users notice issues with the agent, we show the error messages in
Expand All @@ -130,15 +143,6 @@ func Run(cmd *cobra.Command, args []string) {
}

dataGatherers := map[string]datagatherer.DataGatherer{}
group, gctx := errgroup.WithContext(ctx)

defer func() {
// TODO: replace Fatalf log calls with Errorf and return the error
cancel()
if err := group.Wait(); err != nil {
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err)
}
}()

// load datagatherer config and boot each one
for _, dgConfig := range config.DataGatherers {
Expand All @@ -160,6 +164,8 @@ func Run(cmd *cobra.Command, args []string) {
if err := newDg.Run(gctx.Done()); err != nil {
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err)
}
// The agent must stop if any of the data gatherers stops
cancel()
Copy link
Member Author

@wallrj wallrj Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This caused a bug.
Most of the DataGatherer.Run implementations return immediately.
But one (Dynamic) runs an informer which blocks until the supplied channel is closed.
By calling cancel here, I stop all the errgroup go routines including the VenafiConnection helper which is needed later when the data gets posted to Venafi.
As a result, the POST hangs and the agent gets stuck.

  • func (g *DataGatherer) Run(stopCh <-chan struct{}) error {
    // no async functionality, see Fetch
    return nil
    }
  • func (g *DataGathererDiscovery) Run(stopCh <-chan struct{}) error {
    // no async functionality, see Fetch
    return nil
    }
  • // Run starts the dynamic data gatherer's informers for resource collection.
    // Returns error if the data gatherer informer wasn't initialized, Run blocks
    // until the stopCh is closed.
    func (g *DataGathererDynamic) Run(stopCh <-chan struct{}) error {
    if g.informer == nil {
    return fmt.Errorf("informer was not initialized, impossible to start")
    }
    // attach WatchErrorHandler, it needs to be set before starting an informer
    err := g.informer.SetWatchErrorHandler(func(r *k8scache.Reflector, err error) {
    if strings.Contains(fmt.Sprintf("%s", err), "the server could not find the requested resource") {
    logs.Log.Printf("server missing resource for datagatherer of %q ", g.groupVersionResource)
    } else {
    logs.Log.Printf("datagatherer informer for %q has failed and is backing off due to error: %s", g.groupVersionResource, err)
    }
    })
    if err != nil {
    return fmt.Errorf("failed to SetWatchErrorHandler on informer: %s", err)
    }
    // start shared informer
    g.informer.Run(stopCh)
    return nil
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil
})

Expand Down Expand Up @@ -192,15 +198,24 @@ func Run(cmd *cobra.Command, args []string) {

// begin the datagathering loop, periodically sending data to the
// configured output using data in datagatherer caches or refreshing from
// APIs each cycle depending on datagatherer implementation
// APIs each cycle depending on datagatherer implementation.
// If any of the go routines exit (with nil or error) the main context will
// be cancelled, which will cause this blocking loop to exit
// instead of waiting for the time period.
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't
// have to wait for it to finish before exiting the process.
Comment on lines +205 to +206
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be done in another PR.

for {
gatherAndOutputData(eventf, config, preflightClient, dataGatherers)

if config.OneShot {
break
}

time.Sleep(config.Period)
select {
case <-gctx.Done():
return
case <-time.After(config.Period):
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If any of the errgroup go routines exit (with nil or error) the main context will be cancelled, which will cause this blocking loop to exit immediately instead of waiting for the time period.

}
}

Expand Down