@@ -18,11 +18,13 @@ package syncmanager
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
23+ "sync"
2224
2325 "go.uber.org/zap"
2426
25- "github.com/kcp-dev/api-syncagent/internal/controller/sync"
27+ controllersync "github.com/kcp-dev/api-syncagent/internal/controller/sync"
2628 "github.com/kcp-dev/api-syncagent/internal/controllerutil"
2729 "github.com/kcp-dev/api-syncagent/internal/controllerutil/predicate"
2830 "github.com/kcp-dev/api-syncagent/internal/discovery"
@@ -91,12 +93,20 @@ type Reconciler struct {
9193
9294 // The provider based on the APIExport; like the vwManager, this is stopped and recreated
9395 // whenever the APIExport's URL changes.
94- vwProvider * apiexportprovider.Provider
96+ providerOnce sync.Once
97+ vwProvider * apiexportprovider.Provider
9598
99+ syncWorkersLock sync.RWMutex
96100 // A map of sync controllers, one for each PublishedResource, using their
97101 // UIDs and resourceVersion as the map keys; using the version ensures that
98102 // when a PR changes, the old controller is orphaned and will be shut down.
99103 syncWorkers map [string ]syncWorker
104+
105+ clustersLock sync.RWMutex
106+ // A map of clusters that have been engaged with the shim layer. Since this
107+ // reconciler dynamically starts and stops controllers, we need to keep track
108+ // of clusters and engage them with sync controllers started at a later point in time.
109+ clusters map [string ]engagedCluster
100110}
101111
102112type syncWorker struct {
@@ -135,7 +145,14 @@ func Add(
135145 prFilter : prFilter ,
136146 stateNamespace : stateNamespace ,
137147 agentName : agentName ,
148+
149+ providerOnce : sync.Once {},
150+
151+ syncWorkersLock : sync.RWMutex {},
138152 syncWorkers : map [string ]syncWorker {},
153+
154+ clustersLock : sync.RWMutex {},
155+ clusters : make (map [string ]engagedCluster ),
139156 }
140157
141158 bldr := builder .ControllerManagedBy (localManager ).
@@ -161,7 +178,7 @@ func Add(
161178 return err
162179}
163180
164- func (r * Reconciler ) Reconcile (ctx context.Context , _ reconcile.Request ) (reconcile.Result , error ) {
181+ func (r * Reconciler ) Reconcile (ctx context.Context , req reconcile.Request ) (reconcile.Result , error ) {
165182 log := r .log .Named (ControllerName )
166183 log .Debug ("Processing" )
167184
@@ -232,9 +249,11 @@ func (r *Reconciler) reconcile(ctx context.Context, log *zap.SugaredLogger, vwUR
232249}
233250
234251func (r * Reconciler ) ensureManager (log * zap.SugaredLogger , vwURL string ) error {
235- // Use the global app context so this provider is independent of the reconcile
236- // context, which might get cancelled right after Reconcile() is done.
237- r .vwManagerCtx , r .vwManagerCancel = context .WithCancel (r .ctx )
252+ if r .vwManagerCtx == nil {
253+ // Use the global app context so this provider is independent of the reconcile
254+ // context, which might get cancelled right after Reconcile() is done.
255+ r .vwManagerCtx , r .vwManagerCancel = context .WithCancel (r .ctx )
256+ }
238257
239258 vwConfig := rest .CopyConfig (r .kcpRestConfig )
240259 vwConfig .Host = vwURL
@@ -303,23 +322,47 @@ func (r *Reconciler) ensureManager(log *zap.SugaredLogger, vwURL string) error {
303322 r .vwManager = manager
304323 }
305324
306- // start the provider
307- go func () {
308- // Use the global app context so this provider is independent of the reconcile
309- // context, which might get cancelled right after Reconcile() is done.
310- if err := r .vwProvider .Run (r .vwManagerCtx , r .vwManager ); err != nil {
311- log .Fatalw ("Failed to start apiexport provider." , zap .Error (err ))
312- }
313- }()
325+ r .providerOnce .Do (func () {
326+ log .Debug ("Starting virtual workspace provider…" )
327+ // start the provider
328+ go func () {
329+ // Use the global app context so this provider is independent of the reconcile
330+ // context, which might get cancelled right after Reconcile() is done.
331+ if err := r .vwProvider .Run (r .vwManagerCtx , r .vwManager ); err != nil {
332+ log .Fatalw ("Failed to start apiexport provider" , zap .Error (err ))
333+ }
334+ }()
335+ })
314336
315337 return nil
316338}
317339
340+ type engagedCluster struct {
341+ ctx context.Context
342+ cl cluster.Cluster
343+ }
344+
318345type controllerShim struct {
319346 reconciler * Reconciler
320347}
321348
322349func (s * controllerShim ) Engage (ctx context.Context , clusterName string , cl cluster.Cluster ) error {
350+ if _ , ok := s .reconciler .clusters [clusterName ]; ! ok {
351+ s .reconciler .clustersLock .Lock ()
352+ s .reconciler .clusters [clusterName ] = engagedCluster {ctx : ctx , cl : cl }
353+ s .reconciler .clustersLock .Unlock ()
354+
355+ // start a goroutine to make sure we remove the cluster when the context is done
356+ go func () {
357+ <- ctx .Done ()
358+ s .reconciler .clustersLock .Lock ()
359+ delete (s .reconciler .clusters , clusterName )
360+ s .reconciler .clustersLock .Unlock ()
361+ }()
362+ }
363+
364+ s .reconciler .syncWorkersLock .RLock ()
365+ defer s .reconciler .syncWorkersLock .RUnlock ()
323366 for _ , worker := range s .reconciler .syncWorkers {
324367 if err := worker .controller .Engage (ctx , clusterName , cl ); err != nil {
325368 return err
@@ -348,10 +391,17 @@ func (r *Reconciler) shutdown(log *zap.SugaredLogger) {
348391 r .vwManagerCtx = nil
349392 r .vwManagerCancel = nil
350393 r .vwURL = ""
394+ r .providerOnce = sync.Once {}
395+
396+ r .clustersLock .Lock ()
397+ r .clusters = make (map [string ]engagedCluster )
398+ r .clustersLock .Unlock ()
351399
352400 // Free all workers; since their contexts are based on the manager's context,
353401 // they have also been cancelled already above.
354- r .syncWorkers = nil
402+ r .syncWorkersLock .Lock ()
403+ r .syncWorkers = make (map [string ]syncWorker )
404+ r .syncWorkersLock .Unlock ()
355405}
356406
357407func getPublishedResourceKey (pr * syncagentv1alpha1.PublishedResource ) string {
@@ -373,7 +423,10 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
373423 log .Infow ("Stopping sync controller…" , "key" , key )
374424
375425 worker .cancel ()
426+
427+ r .syncWorkersLock .Lock ()
376428 delete (r .syncWorkers , key )
429+ r .syncWorkersLock .Unlock ()
377430 }
378431
379432 // start missing controllers
@@ -386,13 +439,14 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
386439 continue
387440 }
388441
389- log .Infow ("Starting new sync controller…" , "key" , key )
390-
442+ prlog := log .With ("key" , key , "name" , pubRes .Name )
391443 ctrlCtx , ctrlCancel := context .WithCancel (r .vwManagerCtx )
392444
445+ prlog .Info ("Creating new sync controller…" )
446+
393447 // create the sync controller;
394448 // use the reconciler's log without any additional reconciling context
395- syncController , err := sync .Create (
449+ syncController , err := controllersync .Create (
396450 // This can be the reconciling context, as it's only used to find the target CRD during setup;
397451 // this context *must not* be stored in the sync controller!
398452 ctx ,
@@ -410,18 +464,33 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
410464 return fmt .Errorf ("failed to create sync controller: %w" , err )
411465 }
412466
413- log . Infof ( "storing worker at %s" , key )
467+ r . syncWorkersLock . Lock ( )
414468 r .syncWorkers [key ] = syncWorker {
415469 controller : syncController ,
416470 cancel : ctrlCancel ,
417471 }
472+ r .syncWorkersLock .Unlock ()
418473
419- // let 'er rip (remember to use the long-lived context here)
420- if err := syncController .Start (ctrlCtx ); err != nil {
421- ctrlCancel ()
422- log .Info ("deleting again" )
474+ go func () {
475+ log .Infow ("Starting sync controller…" , "key" , key )
476+ if err := syncController .Start (ctrlCtx ); err != nil && ! errors .Is (err , context .Canceled ) {
477+ ctrlCancel ()
478+ prlog .Errorw ("failed to start sync controller" , zap .Error (err ))
479+ }
480+
481+ prlog .Debug ("Stopped sync controller" )
482+
483+ r .syncWorkersLock .Lock ()
423484 delete (r .syncWorkers , key )
424- return fmt .Errorf ("failed to start sync controller: %w" , err )
485+ r .syncWorkersLock .Unlock ()
486+ }()
487+
488+ r .clustersLock .RLock ()
489+ defer r .clustersLock .RUnlock ()
490+ for name , ec := range r .clusters {
491+ if err := syncController .Engage (ec .ctx , name , ec .cl ); err != nil {
492+ prlog .Errorw ("failed to engage cluster" , zap .Error (err ), "cluster" , name )
493+ }
425494 }
426495 }
427496
0 commit comments