-
Notifications
You must be signed in to change notification settings - Fork 25
[VC-35738] Use errgroup for all go routines #601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -74,7 +74,16 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logs.Log.Fatalf("While evaluating configuration: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
group, gctx := errgroup.WithContext(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: replace Fatalf log calls with Errorf and return the error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := group.Wait(); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
group.Go(func() error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
server := http.NewServeMux() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if Flags.Profiling { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -105,21 +114,25 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err := http.ListenAndServe(":8081", server) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil && !errors.Is(err, http.ErrServerClosed) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to run the health check server: %s", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Errorf("failed to run the health check server: %s", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// The agent must stop if the management server stops | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, when we return an error, the group context will be automatically canceled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I agree, that an error should be returned. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a future PR we should add a signal handler to catch sigterm (ctrl-c) and cancel the parent context, and add another Go routine which calls As it stands, sigterm will cause the process (all threads) to immediately exit without any graceful shutdown. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_, isVenConn := preflightClient.(*client.VenConnClient) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if isVenConn { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
go func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err := preflightClient.(manager.Runnable).Start(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
group.Go(func() error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
err := preflightClient.(manager.Runnable).Start(gctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to start a controller-runtime component: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Errorf("failed to start a controller-runtime component: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// The agent must stop if the controller-runtime component stops. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// To help users notice issues with the agent, we show the error messages in | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -130,15 +143,6 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dataGatherers := map[string]datagatherer.DataGatherer{} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
group, gctx := errgroup.WithContext(ctx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO: replace Fatalf log calls with Errorf and return the error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := group.Wait(); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
logs.Log.Fatalf("failed to wait for controller-runtime component to stop: %v", err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// load datagatherer config and boot each one | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, dgConfig := range config.DataGatherers { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -160,6 +164,8 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err := newDg.Run(gctx.Done()); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Errorf("failed to start %q data gatherer %q: %v", kind, dgConfig.Name, err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// The agent must stop if any of the data gatherers stops | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This caused a bug.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -192,15 +198,24 @@ func Run(cmd *cobra.Command, args []string) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// begin the datagathering loop, periodically sending data to the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// configured output using data in datagatherer caches or refreshing from | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// APIs each cycle depending on datagatherer implementation | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// APIs each cycle depending on datagatherer implementation. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// If any of the go routines exit (with nil or error) the main context will | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// be cancelled, which will cause this blocking loop to exit | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// instead of waiting for the time period. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// TODO(wallrj): Pass a context to gatherAndOutputData, so that we don't | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// have to wait for it to finish before exiting the process. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+205
to
+206
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be done in another PR. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
gatherAndOutputData(eventf, config, preflightClient, dataGatherers) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if config.OneShot { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
time.Sleep(config.Period) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-gctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
case <-time.After(config.Period): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If any of the errgroup go routines exit (with nil or error) the main context will be cancelled, which will cause this blocking loop to exit immediately instead of waiting for the time period. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code was cut and pasted earlier in the file so that the errgroup can be initialized and used for the management server (
/metrics
,/healthz
,/debug/pprof
) and for the VenafiConnection manager; both of which were previously run in unsupervised goroutines.