9191 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
9292 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
9393 * operations.</li>
94- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
95- * {@link #addPeer(String)}, {@link #removePeer(String)},
96- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
97- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
98- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
99- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
100- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
101- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
102- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
103- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
104- * {@link #preLogRoll(Path)}.</li>
105- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
106- * modify it, {@link #removePeer(String)} ,
107- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
108- * {@link ReplicationSourceManager.NodeFailoverWorker#run()}.
109- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
110- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
111- * {@link ReplicationSourceInterface} firstly, then remove the wals from
112- * {@link #walsByIdRecoveredQueues}. And {@link ReplicationSourceManager.NodeFailoverWorker#run()}
113- * will add the wals to {@link #walsByIdRecoveredQueues} firstly, then start up a
114- * {@link ReplicationSourceInterface}. So there is no race here. For
115- * {@link ReplicationSourceManager.NodeFailoverWorker#run()} and {@link #removePeer(String)}, there
116- * is already synchronized on {@link #oldsources}. So no need synchronized on
117- * {@link #walsByIdRecoveredQueues}.</li>
11894 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
11995 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
12096 * to-be-removed peer.</li>
@@ -135,15 +111,6 @@ public class ReplicationSourceManager implements ReplicationListener {
135111 // All about stopping
136112 private final Server server ;
137113
138- // All logs we are currently tracking
139- // Index structure of the map is: queue_id->logPrefix/logGroup->logs
140- // For normal replication source, the peer id is same with the queue id
141- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsById ;
142- // Logs for recovered sources we are currently tracking
143- // the map is: queue_id->logPrefix/logGroup->logs
144- // For recovered source, the queue id's format is peer_id-servername-*
145- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues ;
146-
147114 private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager ;
148115
149116 private final Configuration conf ;
@@ -199,8 +166,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
199166 this .replicationPeers = replicationPeers ;
200167 this .replicationTracker = replicationTracker ;
201168 this .server = server ;
202- this .walsById = new ConcurrentHashMap <>();
203- this .walsByIdRecoveredQueues = new ConcurrentHashMap <>();
204169 this .oldsources = new ArrayList <>();
205170 this .conf = conf ;
206171 this .fs = fs ;
@@ -338,7 +303,6 @@ public void removePeer(String peerId) {
338303 // Delete queue from storage and memory and queue id is same with peer id for normal
339304 // source
340305 deleteQueue (peerId );
341- this .walsById .remove (peerId );
342306 }
343307 ReplicationPeerConfig peerConfig = peer .getPeerConfig ();
344308 if (peerConfig .isSyncReplication ()) {
@@ -379,15 +343,10 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
379343 // synchronized on latestPaths to avoid missing the new log
380344 synchronized (this .latestPaths ) {
381345 this .sources .put (peerId , src );
382- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
383- this .walsById .put (peerId , walsByGroup );
384346 // Add the latest wal to that source's queue
385347 if (!latestPaths .isEmpty ()) {
386348 for (Map .Entry <String , Path > walPrefixAndPath : latestPaths .entrySet ()) {
387349 Path walPath = walPrefixAndPath .getValue ();
388- NavigableSet <String > wals = new TreeSet <>();
389- wals .add (walPath .getName ());
390- walsByGroup .put (walPrefixAndPath .getKey (), wals );
391350 // Abort RS and throw exception to make add peer failed
392351 abortAndThrowIOExceptionWhenFail (
393352 () -> this .queueStorage .addWAL (server .getServerName (), peerId , walPath .getName ()));
@@ -441,7 +400,10 @@ public void drainSources(String peerId) throws IOException, ReplicationException
441400 // map from walsById since later we may fail to delete them from the replication queue
442401 // storage, and when we retry next time, we can not know the wal files that need to be deleted
443402 // from the replication queue storage.
444- walsById .get (peerId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
403+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId ).forEach (wal -> {
404+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
405+ wals .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
406+ });
445407 }
446408 LOG .info ("Startup replication source for " + src .getPeerId ());
447409 src .startup ();
@@ -450,15 +412,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
450412 queueStorage .removeWAL (server .getServerName (), peerId , wal );
451413 }
452414 }
453- synchronized (walsById ) {
454- Map <String , NavigableSet <String >> oldWals = walsById .get (peerId );
455- wals .forEach ((k , v ) -> {
456- NavigableSet <String > walsByGroup = oldWals .get (k );
457- if (walsByGroup != null ) {
458- walsByGroup .removeAll (v );
459- }
460- });
461- }
462415 // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
463416 // a background task, we will delete the file from replication queue storage under the lock to
464417 // simplify the logic.
@@ -470,7 +423,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
470423 oldSource .terminate (terminateMessage );
471424 oldSource .getSourceMetrics ().clear ();
472425 queueStorage .removeQueue (server .getServerName (), queueId );
473- walsByIdRecoveredQueues .remove (queueId );
474426 iter .remove ();
475427 }
476428 }
@@ -483,7 +435,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
483435 * replication queue storage and only to enqueue all logs to the new replication source
484436 * @param peerId the id of the replication peer
485437 */
486- public void refreshSources (String peerId ) throws IOException {
438+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
487439 String terminateMessage = "Peer " + peerId +
488440 " state or config changed. Will close the previous replication source and open a new one" ;
489441 ReplicationPeer peer = replicationPeers .getPeer (peerId );
@@ -496,9 +448,8 @@ public void refreshSources(String peerId) throws IOException {
496448 // Do not clear metrics
497449 toRemove .terminate (terminateMessage , null , false );
498450 }
499- for (NavigableSet <String > walsByGroup : walsById .get (peerId ).values ()) {
500- walsByGroup .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
501- }
451+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId )
452+ .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
502453 }
503454 LOG .info ("Startup replication source for " + src .getPeerId ());
504455 src .startup ();
@@ -519,9 +470,8 @@ public void refreshSources(String peerId) throws IOException {
519470 for (String queueId : previousQueueIds ) {
520471 ReplicationSourceInterface recoveredReplicationSource = createSource (queueId , peer );
521472 this .oldsources .add (recoveredReplicationSource );
522- for (SortedSet <String > walsByGroup : walsByIdRecoveredQueues .get (queueId ).values ()) {
523- walsByGroup .forEach (wal -> recoveredReplicationSource .enqueueLog (new Path (wal )));
524- }
473+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )
474+ .forEach (wal -> recoveredReplicationSource .enqueueLog (new Path (wal )));
525475 toStartup .add (recoveredReplicationSource );
526476 }
527477 }
@@ -541,7 +491,6 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
541491 LOG .info ("Done with the recovered queue {}" , src .getQueueId ());
542492 // Delete queue from storage and memory
543493 deleteQueue (src .getQueueId ());
544- this .walsByIdRecoveredQueues .remove (src .getQueueId ());
545494 return true ;
546495 }
547496
@@ -564,8 +513,6 @@ void removeSource(ReplicationSourceInterface src) {
564513 this .sources .remove (src .getPeerId ());
565514 // Delete queue from storage and memory
566515 deleteQueue (src .getQueueId ());
567- this .walsById .remove (src .getQueueId ());
568-
569516 }
570517
571518 /**
@@ -651,42 +598,19 @@ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
651598 * @param source the replication source
652599 */
653600 @ VisibleForTesting
654- void cleanOldLogs (String log , boolean inclusive , ReplicationSourceInterface source ) {
655- String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
656- if (source .isRecovered ()) {
657- NavigableSet <String > wals = walsByIdRecoveredQueues .get (source .getQueueId ()).get (logPrefix );
658- if (wals != null ) {
659- NavigableSet <String > walsToRemove = wals .headSet (log , inclusive );
660- if (walsToRemove .isEmpty ()) {
661- return ;
662- }
663- cleanOldLogs (walsToRemove , source );
664- walsToRemove .clear ();
665- }
666- } else {
667- NavigableSet <String > wals ;
668- NavigableSet <String > walsToRemove ;
669- // synchronized on walsById to avoid race with preLogRoll
670- synchronized (this .walsById ) {
671- wals = walsById .get (source .getQueueId ()).get (logPrefix );
672- if (wals == null ) {
673- return ;
674- }
675- walsToRemove = wals .headSet (log , inclusive );
676- if (walsToRemove .isEmpty ()) {
677- return ;
678- }
679- walsToRemove = new TreeSet <>(walsToRemove );
680- }
681- // cleanOldLogs may spend some time, especially for sync replication where we may want to
682- // remove remote wals as the remote cluster may have already been down, so we do it outside
683- // the lock to avoid block preLogRoll
684- cleanOldLogs (walsToRemove , source );
685- // now let's remove the files in the set
686- synchronized (this .walsById ) {
687- wals .removeAll (walsToRemove );
688- }
601+ void cleanOldLogs (String log , boolean inclusive ,
602+ ReplicationSourceInterface source ) {
603+ NavigableSet <String > walsToRemove ;
604+ synchronized (this .latestPaths ) {
605+ walsToRemove = getWalsToRemove (source .getQueueId (), log , inclusive );
689606 }
607+ if (walsToRemove .isEmpty ()) {
608+ return ;
609+ }
610+ // cleanOldLogs may spend some time, especially for sync replication where we may want to
611+ // remove remote wals as the remote cluster may have already been down, so we do it outside
612+ // the lock to avoid block preLogRoll
613+ cleanOldLogs (walsToRemove , source );
690614 }
691615
692616 private void removeRemoteWALs (String peerId , String remoteWALDir , Collection <String > wals )
@@ -767,37 +691,6 @@ public void preLogRoll(Path newLog) throws IOException {
767691 abortAndThrowIOExceptionWhenFail (
768692 () -> this .queueStorage .addWAL (server .getServerName (), source .getQueueId (), logName ));
769693 }
770-
771- // synchronized on walsById to avoid race with cleanOldLogs
772- synchronized (this .walsById ) {
773- // Update walsById map
774- for (Map .Entry <String , Map <String , NavigableSet <String >>> entry : this .walsById
775- .entrySet ()) {
776- String peerId = entry .getKey ();
777- Map <String , NavigableSet <String >> walsByPrefix = entry .getValue ();
778- boolean existingPrefix = false ;
779- for (Map .Entry <String , NavigableSet <String >> walsEntry : walsByPrefix .entrySet ()) {
780- SortedSet <String > wals = walsEntry .getValue ();
781- if (this .sources .isEmpty ()) {
782- // If there's no slaves, don't need to keep the old wals since
783- // we only consider the last one when a new slave comes in
784- wals .clear ();
785- }
786- if (logPrefix .equals (walsEntry .getKey ())) {
787- wals .add (logName );
788- existingPrefix = true ;
789- }
790- }
791- if (!existingPrefix ) {
792- // The new log belongs to a new group, add it into this peer
793- LOG .debug ("Start tracking logs for wal group {} for peer {}" , logPrefix , peerId );
794- NavigableSet <String > wals = new TreeSet <>();
795- wals .add (logName );
796- walsByPrefix .put (logPrefix , wals );
797- }
798- }
799- }
800-
801694 // Add to latestPaths
802695 latestPaths .put (logPrefix , newLog );
803696 }
@@ -969,18 +862,6 @@ public void run() {
969862 continue ;
970863 }
971864 }
972- // track sources in walsByIdRecoveredQueues
973- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
974- walsByIdRecoveredQueues .put (queueId , walsByGroup );
975- for (String wal : walsSet ) {
976- String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
977- NavigableSet <String > wals = walsByGroup .get (walPrefix );
978- if (wals == null ) {
979- wals = new TreeSet <>();
980- walsByGroup .put (walPrefix , wals );
981- }
982- wals .add (wal );
983- }
984865 oldsources .add (src );
985866 LOG .trace ("Added source for recovered queue: " + src .getQueueId ());
986867 for (String wal : walsSet ) {
@@ -1012,7 +893,18 @@ public void join() {
1012893 * @return a sorted set of wal names
1013894 */
1014895 @ VisibleForTesting
1015- public Map <String , Map <String , NavigableSet <String >>> getWALs () {
896+ public Map <String , Map <String , NavigableSet <String >>> getWALs ()
897+ throws ReplicationException {
898+ Map <String , Map <String , NavigableSet <String >>> walsById = new HashMap <>();
899+ for (ReplicationSourceInterface source : sources .values ()) {
900+ String queueId = source .getQueueId ();
901+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
902+ walsById .put (queueId , walsByGroup );
903+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
904+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
905+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
906+ }
907+ }
1016908 return Collections .unmodifiableMap (walsById );
1017909 }
1018910
@@ -1021,7 +913,18 @@ public Map<String, Map<String, NavigableSet<String>>> getWALs() {
1021913 * @return a sorted set of wal names
1022914 */
1023915 @ VisibleForTesting
1024- Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues () {
916+ Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues ()
917+ throws ReplicationException {
918+ Map <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues = new HashMap <>();
919+ for (ReplicationSourceInterface source : oldsources ) {
920+ String queueId = source .getQueueId ();
921+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
922+ walsByIdRecoveredQueues .put (queueId , walsByGroup );
923+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
924+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
925+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
926+ }
927+ }
1025928 return Collections .unmodifiableMap (walsByIdRecoveredQueues );
1026929 }
1027930
@@ -1200,4 +1103,21 @@ int activeFailoverTaskCount() {
12001103 MetricsReplicationGlobalSourceSource getGlobalMetrics () {
12011104 return this .globalMetrics ;
12021105 }
1106+
1107+ private NavigableSet <String > getWalsToRemove (String queueId , String log , boolean inclusive ) {
1108+ NavigableSet <String > walsToRemove = new TreeSet <>();
1109+ String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
1110+ try {
1111+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId ).forEach (wal -> {
1112+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
1113+ if (walPrefix .equals (logPrefix )) {
1114+ walsToRemove .add (wal );
1115+ }
1116+ });
1117+ } catch (ReplicationException e ) {
1118+ // Just log the exception here, as the recovered replication source will try to cleanup again.
1119+ LOG .warn ("Failed to read wals in queue {}" , queueId , e );
1120+ }
1121+ return walsToRemove .headSet (log , inclusive );
1122+ }
12031123}
0 commit comments