3535import java .util .concurrent .PriorityBlockingQueue ;
3636import java .util .concurrent .TimeUnit ;
3737import java .util .concurrent .TimeoutException ;
38+ import java .util .concurrent .atomic .AtomicBoolean ;
3839import java .util .concurrent .atomic .AtomicLong ;
3940import java .util .function .Predicate ;
41+
4042import org .apache .commons .lang3 .StringUtils ;
43+ import org .apache .commons .lang3 .mutable .MutableBoolean ;
4144import org .apache .hadoop .conf .Configuration ;
4245import org .apache .hadoop .fs .FileSystem ;
4346import org .apache .hadoop .fs .Path ;
@@ -120,6 +123,14 @@ public class ReplicationSource implements ReplicationSourceInterface {
120123 // ReplicationEndpoint which will handle the actual replication
121124 private volatile ReplicationEndpoint replicationEndpoint ;
122125
126+ private boolean abortOnError ;
127+ //This is needed for the startup loop to identify when there's already
128+ //an initialization happening (but not finished yet),
129+ //so that it doesn't try submit another initialize thread.
130+ //NOTE: this should only be set to false at the end of initialize method, prior to return.
131+ private AtomicBoolean startupOngoing = new AtomicBoolean (false );
132+
133+
123134 /**
124135 * A filter (or a chain of filters) for WAL entries; filters out edits.
125136 */
@@ -217,6 +228,10 @@ public void init(Configuration conf, FileSystem fs, ReplicationSourceManager man
217228 this .throttler = new ReplicationThrottler ((double ) currentBandwidth / 10.0 );
218229 this .totalBufferUsed = manager .getTotalBufferUsed ();
219230 this .walFileLengthProvider = walFileLengthProvider ;
231+
232+ this .abortOnError = this .conf .getBoolean ("replication.source.regionserver.abort" ,
233+ true );
234+
220235 LOG .info ("queueId={}, ReplicationSource: {}, currentBandwidth={}" , queueId ,
221236 replicationPeer .getId (), this .currentBandwidth );
222237 }
@@ -372,10 +387,10 @@ private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> q
372387 createNewWALReader (walGroupId , queue , worker .getStartPosition ());
373388 Threads .setDaemonThreadRunning (
374389 walReader , Thread .currentThread ().getName ()
375- + ".replicationSource.wal-reader." + walGroupId + "," + queueId ,
376- this :: uncaughtException );
390+ + ".replicationSource.wal-reader." + walGroupId + "," + queueId ,
391+ ( t , e ) -> this . uncaughtException ( t , e , this . manager , this . getPeerId ()) );
377392 worker .setWALReader (walReader );
378- worker .startup (this :: uncaughtException );
393+ worker .startup (( t , e ) -> this . uncaughtException ( t , e , this . manager , this . getPeerId ()) );
379394 return worker ;
380395 }
381396 });
@@ -450,11 +465,28 @@ WALEntryFilter getWalEntryFilter() {
450465 return walEntryFilter ;
451466 }
452467
453- protected final void uncaughtException (Thread t , Throwable e ) {
468+ protected final void uncaughtException (Thread t , Throwable e ,
469+ ReplicationSourceManager manager , String peerId ) {
454470 RSRpcServices .exitIfOOME (e );
455471 LOG .error ("Unexpected exception in {} currentPath={}" ,
456472 t .getName (), getCurrentPath (), e );
457- server .abort ("Unexpected exception in " + t .getName (), e );
473+ if (abortOnError ){
474+ server .abort ("Unexpected exception in " + t .getName (), e );
475+ }
476+ if (manager != null ){
477+ while (true ) {
478+ try {
479+ LOG .info ("Refreshing replication sources now due to previous error on thread: {}" ,
480+ t .getName ());
481+ manager .refreshSources (peerId );
482+ break ;
483+ } catch (IOException e1 ) {
484+ LOG .error ("Replication sources refresh failed." , e1 );
485+ sleepForRetries ("Sleeping before try refreshing sources again" ,
486+ maxRetriesMultiplier );
487+ }
488+ }
489+ }
458490 }
459491
460492 @ Override
@@ -544,12 +576,16 @@ private void initialize() {
544576 replicationEndpoint .stop ();
545577 if (sleepForRetries ("Error starting ReplicationEndpoint" , sleepMultiplier )) {
546578 sleepMultiplier ++;
579+ } else {
580+ this .startupOngoing .set (false );
581+ throw new RuntimeException ("Exhausted retries to start replication endpoint." );
547582 }
548583 }
549584 }
550585
551586 if (!this .isSourceActive ()) {
552- return ;
587+ this .startupOngoing .set (false );
588+ throw new IllegalStateException ("Source should be active." );
553589 }
554590
555591 sleepMultiplier = 1 ;
@@ -571,7 +607,8 @@ private void initialize() {
571607 }
572608
573609 if (!this .isSourceActive ()) {
574- return ;
610+ this .startupOngoing .set (false );
611+ throw new IllegalStateException ("Source should be active." );
575612 }
576613 LOG .info ("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};" ,
577614 logPeerId (), this .replicationQueueInfo .getQueueId (), clusterId , peerClusterId );
@@ -583,16 +620,30 @@ private void initialize() {
583620 PriorityBlockingQueue <Path > queue = entry .getValue ();
584621 tryStartNewShipper (walGroupId , queue );
585622 }
623+ this .startupOngoing .set (false );
586624 }
587625
588626 @ Override
589627 public void startup () {
590- // mark we are running now
628+ //Flag that signalizes uncaught error happening while starting up the source
629+ // and a retry should be attempted
630+ MutableBoolean retryStartup = new MutableBoolean (true );
591631 this .sourceRunning = true ;
592- initThread = new Thread (this ::initialize );
593- Threads .setDaemonThreadRunning (initThread ,
594- Thread .currentThread ().getName () + ".replicationSource," + this .queueId ,
595- this ::uncaughtException );
632+ do {
633+ if (retryStartup .booleanValue ()) {
634+ retryStartup .setValue (false );
635+ startupOngoing .set (true );
636+ // mark we are running now
637+ initThread = new Thread (this ::initialize );
638+ Threads .setDaemonThreadRunning (initThread ,
639+ Thread .currentThread ().getName () + ".replicationSource," + this .queueId ,
640+ (t ,e ) -> {
641+ sourceRunning = false ;
642+ uncaughtException (t , e , null , null );
643+ retryStartup .setValue (!this .abortOnError );
644+ });
645+ }
646+ } while (this .startupOngoing .get () && !this .abortOnError );
596647 }
597648
598649 @ Override
0 commit comments