@@ -46,8 +46,9 @@ import (
4646
4747const (
4848 // We have set a buffer in order to reduce times of context switches.
49- incomingBufSize = 100
50- outgoingBufSize = 100
49+ incomingBufSize = 100
50+ outgoingBufSize = 100
51+ processEventConcurrency = 10
5152)
5253
5354// defaultWatcherMaxLimit is used to facilitate construction tests
@@ -230,8 +231,7 @@ func (wc *watchChan) run(initialEventsEndBookmarkRequired, forceInitialEvents bo
230231 go wc .startWatching (watchClosedCh , initialEventsEndBookmarkRequired , forceInitialEvents )
231232
232233 var resultChanWG sync.WaitGroup
233- resultChanWG .Add (1 )
234- go wc .processEvent (& resultChanWG )
234+ wc .processEvents (& resultChanWG )
235235
236236 select {
237237 case err := <- wc .errChan :
@@ -424,18 +424,25 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}, initialEventsEnd
424424 close (watchClosedCh )
425425}
426426
427- // processEvent processes events from etcd watcher and sends results to resultChan.
428- func (wc * watchChan ) processEvent (wg * sync.WaitGroup ) {
427+ // processEvents processes events from etcd watcher and sends results to resultChan.
428+ func (wc * watchChan ) processEvents (wg * sync.WaitGroup ) {
429+ if utilfeature .DefaultFeatureGate .Enabled (features .ConcurrentWatchObjectDecode ) {
430+ wc .concurrentProcessEvents (wg )
431+ } else {
432+ wg .Add (1 )
433+ go wc .serialProcessEvents (wg )
434+ }
435+ }
436+ func (wc * watchChan ) serialProcessEvents (wg * sync.WaitGroup ) {
429437 defer wg .Done ()
430-
431438 for {
432439 select {
433440 case e := <- wc .incomingEventChan :
434441 res := wc .transform (e )
435442 if res == nil {
436443 continue
437444 }
438- if len (wc .resultChan ) == outgoingBufSize {
445+ if len (wc .resultChan ) == cap ( wc . resultChan ) {
439446 klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , wc .watcher .objectType , "groupResource" , wc .watcher .groupResource )
440447 }
441448 // If user couldn't receive results fast enough, we also block incoming events from watcher.
@@ -452,6 +459,95 @@ func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
452459 }
453460}
454461
462+ func (wc * watchChan ) concurrentProcessEvents (wg * sync.WaitGroup ) {
463+ p := concurrentOrderedEventProcessing {
464+ input : wc .incomingEventChan ,
465+ processFunc : wc .transform ,
466+ output : wc .resultChan ,
467+ processingQueue : make (chan chan * watch.Event , processEventConcurrency - 1 ),
468+
469+ objectType : wc .watcher .objectType ,
470+ groupResource : wc .watcher .groupResource ,
471+ }
472+ wg .Add (1 )
473+ go func () {
474+ defer wg .Done ()
475+ p .scheduleEventProcessing (wc .ctx , wg )
476+ }()
477+ wg .Add (1 )
478+ go func () {
479+ defer wg .Done ()
480+ p .collectEventProcessing (wc .ctx )
481+ }()
482+ }
483+
484+ type concurrentOrderedEventProcessing struct {
485+ input chan * event
486+ processFunc func (* event ) * watch.Event
487+ output chan watch.Event
488+
489+ processingQueue chan chan * watch.Event
490+ // Metadata for logging
491+ objectType string
492+ groupResource schema.GroupResource
493+ }
494+
495+ func (p * concurrentOrderedEventProcessing ) scheduleEventProcessing (ctx context.Context , wg * sync.WaitGroup ) {
496+ var e * event
497+ for {
498+ select {
499+ case <- ctx .Done ():
500+ return
501+ case e = <- p .input :
502+ }
503+ processingResponse := make (chan * watch.Event , 1 )
504+ select {
505+ case <- ctx .Done ():
506+ return
507+ case p .processingQueue <- processingResponse :
508+ }
509+ wg .Add (1 )
510+ go func (e * event , response chan <- * watch.Event ) {
511+ defer wg .Done ()
512+ select {
513+ case <- ctx .Done ():
514+ case response <- p .processFunc (e ):
515+ }
516+ }(e , processingResponse )
517+ }
518+ }
519+
520+ func (p * concurrentOrderedEventProcessing ) collectEventProcessing (ctx context.Context ) {
521+ var processingResponse chan * watch.Event
522+ var e * watch.Event
523+ for {
524+ select {
525+ case <- ctx .Done ():
526+ return
527+ case processingResponse = <- p .processingQueue :
528+ }
529+ select {
530+ case <- ctx .Done ():
531+ return
532+ case e = <- processingResponse :
533+ }
534+ if e == nil {
535+ continue
536+ }
537+ if len (p .output ) == cap (p .output ) {
538+ klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , p .objectType , "groupResource" , p .groupResource )
539+ }
540+ // If user couldn't receive results fast enough, we also block incoming events from watcher.
541+ // Because storing events in local will cause more memory usage.
542+ // The worst case would be closing the fast watcher.
543+ select {
544+ case <- ctx .Done ():
545+ return
546+ case p .output <- * e :
547+ }
548+ }
549+ }
550+
455551func (wc * watchChan ) filter (obj runtime.Object ) bool {
456552 if wc .internalPred .Empty () {
457553 return true
0 commit comments