3232
3333import  static  io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getName ;
3434
35- public  class  EventProcessor <R  extends  HasMetadata > implements  EventHandler , LifecycleAware  {
35+ public  class  EventProcessor <P  extends  HasMetadata > implements  EventHandler , LifecycleAware  {
3636
3737  private  static  final  Logger  log  = LoggerFactory .getLogger (EventProcessor .class );
3838  private  static  final  long  MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION  = 50 ;
3939
4040  private  volatile  boolean  running ;
4141  private  final  ControllerConfiguration <?> controllerConfiguration ;
42-   private  final  ReconciliationDispatcher <R > reconciliationDispatcher ;
42+   private  final  ReconciliationDispatcher <P > reconciliationDispatcher ;
4343  private  final  Retry  retry ;
4444  private  final  ExecutorService  executor ;
4545  private  final  Metrics  metrics ;
46-   private  final  Cache <R > cache ;
47-   private  final  EventSourceManager <R > eventSourceManager ;
46+   private  final  Cache <P > cache ;
47+   private  final  EventSourceManager <P > eventSourceManager ;
4848  private  final  RateLimiter <? extends  RateLimitState > rateLimiter ;
4949  private  final  ResourceStateManager  resourceStateManager  = new  ResourceStateManager ();
5050  private  final  Map <String , Object > metricsMetadata ;
5151
5252
53-   public  EventProcessor (EventSourceManager <R > eventSourceManager ) {
53+   public  EventProcessor (EventSourceManager <P > eventSourceManager ) {
5454    this (
5555        eventSourceManager .getController ().getConfiguration (),
5656        eventSourceManager .getControllerResourceEventSource (),
@@ -63,8 +63,8 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
6363  @ SuppressWarnings ("rawtypes" )
6464  EventProcessor (
6565      ControllerConfiguration  controllerConfiguration ,
66-       ReconciliationDispatcher <R > reconciliationDispatcher ,
67-       EventSourceManager <R > eventSourceManager ,
66+       ReconciliationDispatcher <P > reconciliationDispatcher ,
67+       EventSourceManager <P > eventSourceManager ,
6868      Metrics  metrics ) {
6969    this (
7070        controllerConfiguration ,
@@ -78,11 +78,11 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
7878  @ SuppressWarnings ({"rawtypes" , "unchecked" })
7979  private  EventProcessor (
8080      ControllerConfiguration  controllerConfiguration ,
81-       Cache <R > cache ,
81+       Cache <P > cache ,
8282      ExecutorService  executor ,
83-       ReconciliationDispatcher <R > reconciliationDispatcher ,
83+       ReconciliationDispatcher <P > reconciliationDispatcher ,
8484      Metrics  metrics ,
85-       EventSourceManager <R > eventSourceManager ) {
85+       EventSourceManager <P > eventSourceManager ) {
8686    this .controllerConfiguration  = controllerConfiguration ;
8787    this .running  = false ;
8888    this .executor  =
@@ -136,7 +136,7 @@ private void submitReconciliationExecution(ResourceState state) {
136136    try  {
137137      boolean  controllerUnderExecution  = isControllerUnderExecution (state );
138138      final  var  resourceID  = state .getId ();
139-       Optional <R > maybeLatest  = cache .get (resourceID );
139+       Optional <P > maybeLatest  = cache .get (resourceID );
140140      maybeLatest .ifPresent (MDCUtils ::addResourceInfo );
141141      if  (!controllerUnderExecution  && maybeLatest .isPresent ()) {
142142        var  rateLimit  = state .getRateLimit ();
@@ -151,9 +151,9 @@ private void submitReconciliationExecution(ResourceState state) {
151151        }
152152        state .setUnderProcessing (true );
153153        final  var  latest  = maybeLatest .get ();
154-         ExecutionScope <R > executionScope  = new  ExecutionScope <>(latest , state .getRetry ());
154+         ExecutionScope <P > executionScope  = new  ExecutionScope <>(latest , state .getRetry ());
155155        state .unMarkEventReceived ();
156-         metrics .reconcileCustomResource (resourceID , state .getRetry (), metricsMetadata );
156+         metrics .reconcileCustomResource (latest , state .getRetry (), metricsMetadata );
157157        log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
158158        executor .execute (new  ReconcilerExecutor (executionScope ));
159159      } else  {
@@ -221,7 +221,7 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
221221  }
222222
223223  synchronized  void  eventProcessingFinished (
224-       ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
224+       ExecutionScope <P > executionScope , PostExecutionControl <P > postExecutionControl ) {
225225    if  (!running ) {
226226      return ;
227227    }
@@ -244,7 +244,7 @@ synchronized void eventProcessingFinished(
244244      return ;
245245    }
246246    cleanupOnSuccessfulExecution (executionScope );
247-     metrics .finishedReconciliation (resourceID , metricsMetadata );
247+     metrics .finishedReconciliation (executionScope . getResource () , metricsMetadata );
248248    if  (state .deleteEventPresent ()) {
249249      cleanupForDeletedEvent (executionScope .getResourceID ());
250250    } else  if  (postExecutionControl .isFinalizerRemoved ()) {
@@ -253,12 +253,12 @@ synchronized void eventProcessingFinished(
253253      postExecutionControl 
254254          .getUpdatedCustomResource ()
255255          .ifPresent (
256-               r  -> {
256+               p  -> {
257257                if  (!postExecutionControl .updateIsStatusPatch ()) {
258258                  eventSourceManager 
259259                      .getControllerResourceEventSource ()
260260                      .handleRecentResourceUpdate (
261-                           ResourceID .fromResource (r ), r , executionScope .getResource ());
261+                           ResourceID .fromResource (p ), p , executionScope .getResource ());
262262                }
263263              });
264264      if  (state .eventPresent ()) {
@@ -270,7 +270,7 @@ synchronized void eventProcessingFinished(
270270  }
271271
272272  private  void  reScheduleExecutionIfInstructed (
273-       PostExecutionControl <R > postExecutionControl , R  customResource ) {
273+       PostExecutionControl <P > postExecutionControl , P  customResource ) {
274274
275275    postExecutionControl 
276276        .getReScheduleDelay ()
@@ -281,7 +281,7 @@ private void reScheduleExecutionIfInstructed(
281281        }, () -> scheduleExecutionForMaxReconciliationInterval (customResource ));
282282  }
283283
284-   private  void  scheduleExecutionForMaxReconciliationInterval (R  customResource ) {
284+   private  void  scheduleExecutionForMaxReconciliationInterval (P  customResource ) {
285285    this .controllerConfiguration 
286286        .maxReconciliationInterval ()
287287        .ifPresent (m  -> {
@@ -294,7 +294,7 @@ private void scheduleExecutionForMaxReconciliationInterval(R customResource) {
294294        });
295295  }
296296
297-   TimerEventSource <R > retryEventSource () {
297+   TimerEventSource <P > retryEventSource () {
298298    return  eventSourceManager .retryEventSource ();
299299  }
300300
@@ -304,7 +304,7 @@ TimerEventSource<R> retryEventSource() {
304304   * according to the retry timing if there was an exception. 
305305   */ 
306306  private  void  handleRetryOnException (
307-       ExecutionScope <R > executionScope , Exception  exception ) {
307+       ExecutionScope <P > executionScope , Exception  exception ) {
308308    final  var  state  = getOrInitRetryExecution (executionScope );
309309    var  resourceID  = state .getId ();
310310    boolean  eventPresent  = state .eventPresent ();
@@ -323,7 +323,7 @@ private void handleRetryOnException(
323323              "Scheduling timer event for retry with delay:{} for resource: {}" ,
324324              delay ,
325325              resourceID );
326-           metrics .failedReconciliation (resourceID , exception , metricsMetadata );
326+           metrics .failedReconciliation (executionScope . getResource () , exception , metricsMetadata );
327327          retryEventSource ().scheduleOnce (resourceID , delay );
328328        },
329329        () -> {
@@ -332,7 +332,7 @@ private void handleRetryOnException(
332332        });
333333  }
334334
335-   private  void  cleanupOnSuccessfulExecution (ExecutionScope <R > executionScope ) {
335+   private  void  cleanupOnSuccessfulExecution (ExecutionScope <P > executionScope ) {
336336    log .debug (
337337        "Cleanup for successful execution for resource: {}" , getName (executionScope .getResource ()));
338338    if  (isRetryConfigured ()) {
@@ -341,7 +341,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
341341    retryEventSource ().cancelOnceSchedule (executionScope .getResourceID ());
342342  }
343343
344-   private  ResourceState  getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
344+   private  ResourceState  getOrInitRetryExecution (ExecutionScope <P > executionScope ) {
345345    final  var  state  = resourceStateManager .getOrCreate (executionScope .getResourceID ());
346346    RetryExecution  retryExecution  = state .getRetry ();
347347    if  (retryExecution  == null ) {
@@ -387,9 +387,9 @@ private void handleAlreadyMarkedEvents() {
387387  }
388388
389389  private  class  ReconcilerExecutor  implements  Runnable  {
390-     private  final  ExecutionScope <R > executionScope ;
390+     private  final  ExecutionScope <P > executionScope ;
391391
392-     private  ReconcilerExecutor (ExecutionScope <R > executionScope ) {
392+     private  ReconcilerExecutor (ExecutionScope <P > executionScope ) {
393393      this .executionScope  = executionScope ;
394394    }
395395
@@ -401,7 +401,7 @@ public void run() {
401401      try  {
402402        MDCUtils .addResourceInfo (executionScope .getResource ());
403403        thread .setName ("ReconcilerExecutor-"  + controllerName () + "-"  + thread .getId ());
404-         PostExecutionControl <R > postExecutionControl  =
404+         PostExecutionControl <P > postExecutionControl  =
405405            reconciliationDispatcher .handleExecution (executionScope );
406406        eventProcessingFinished (executionScope , postExecutionControl );
407407      } finally  {
0 commit comments