Skip to content

Commit 73ea43f

Browse files
authored
HBASE-27984 NPE in MigrateReplicationQueueFromZkToTableProcedure recovery (#5329)
Signed-off-by: GeorryHuang <[email protected]>
1 parent 2c92e6f commit 73ea43f

File tree

10 files changed

+40
-23
lines changed

10 files changed

+40
-23
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@
229229
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
230230
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
231231
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
232+
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
232233
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
233234
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
234235
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp.ReplicationSyncUpToolInfo;
@@ -364,6 +365,9 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
364365

365366
private RSGroupInfoManager rsGroupInfoManager;
366367

368+
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
369+
new ReplicationLogCleanerBarrier();
370+
367371
// manager of replication
368372
private ReplicationPeerManager replicationPeerManager;
369373

@@ -4106,6 +4110,11 @@ public ReplicationPeerManager getReplicationPeerManager() {
41064110
return replicationPeerManager;
41074111
}
41084112

4113+
@Override
4114+
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
4115+
return replicationLogCleanerBarrier;
4116+
}
4117+
41094118
public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
41104119
getReplicationLoad(ServerName[] serverNames) {
41114120
List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
5252
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
5353
import org.apache.hadoop.hbase.replication.SyncReplicationState;
54+
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
5455
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
5556
import org.apache.hadoop.hbase.security.access.AccessChecker;
5657
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -361,6 +362,12 @@ ReplicationPeerConfig getReplicationPeerConfig(String peerId)
361362
*/
362363
ReplicationPeerManager getReplicationPeerManager();
363364

365+
/**
366+
* Returns the {@link ReplicationLogCleanerBarrier}. It will be used at multiple places so we put
367+
* it in MasterServices directly.
368+
*/
369+
ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier();
370+
364371
/**
365372
* Returns the {@link SyncReplicationReplayWALManager}.
366373
*/

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AddPeerProcedure.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ protected ReplicationPeerConfig getNewPeerConfig() {
8686
@Override
8787
protected void releaseLatch(MasterProcedureEnv env) {
8888
if (cleanerDisabled) {
89-
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
89+
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
9090
}
9191
if (peerConfig.isSyncReplication()) {
9292
env.getReplicationPeerManager().releaseSyncReplicationPeerLock();
@@ -97,7 +97,7 @@ protected void releaseLatch(MasterProcedureEnv env) {
9797
@Override
9898
protected void prePeerModification(MasterProcedureEnv env)
9999
throws IOException, ReplicationException, ProcedureSuspendedException {
100-
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
100+
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
101101
throw suspend(env.getMasterConfiguration(),
102102
backoff -> LOG.warn("LogCleaner is run at the same time when adding peer {}, sleep {} secs",
103103
peerId, backoff / 1000));
@@ -142,7 +142,7 @@ protected void afterReplay(MasterProcedureEnv env) {
142142
// when executing the procedure we will try to disable and acquire.
143143
return;
144144
}
145-
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
145+
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
146146
throw new IllegalStateException("can not disable log cleaner, this should not happen");
147147
}
148148
cleanerDisabled = true;

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private void shutdownExecutorService() {
115115

116116
private void disableReplicationLogCleaner(MasterProcedureEnv env)
117117
throws ProcedureSuspendedException {
118-
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
118+
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
119119
// it is not likely that we can reach here as we will schedule this procedure immediately
120120
// after master restarting, where ReplicationLogCleaner should have not started its first run
121121
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
@@ -130,7 +130,7 @@ private void disableReplicationLogCleaner(MasterProcedureEnv env)
130130
}
131131

132132
private void enableReplicationLogCleaner(MasterProcedureEnv env) {
133-
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
133+
env.getMasterServices().getReplicationLogCleanerBarrier().enable();
134134
}
135135

136136
private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
@@ -224,7 +224,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
224224
lockEntry = procLock.getLockEntry(getProcId());
225225
} catch (IOException ioe) {
226226
LOG.error("Error while acquiring execution lock for procedure {}"
227-
+ " when trying to wake it up, aborting...", ioe);
227+
+ " when trying to wake it up, aborting...", this, ioe);
228228
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
229229
return;
230230
}
@@ -304,7 +304,7 @@ protected void afterReplay(MasterProcedureEnv env) {
304304
// when executing the procedure we will try to disable and acquire.
305305
return;
306306
}
307-
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
307+
if (!env.getMasterServices().getReplicationLogCleanerBarrier().disable()) {
308308
throw new IllegalStateException("can not disable log cleaner, this should not happen");
309309
}
310310
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.MigrationIterator;
7171
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
7272
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
73-
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
7473
import org.apache.hadoop.hbase.util.FutureUtils;
7574
import org.apache.hadoop.hbase.util.Pair;
7675
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@@ -115,9 +114,6 @@ public class ReplicationPeerManager implements ConfigurationObserver {
115114
// Only allow to add one sync replication peer concurrently
116115
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
117116

118-
private final ReplicationLogCleanerBarrier replicationLogCleanerBarrier =
119-
new ReplicationLogCleanerBarrier();
120-
121117
private final String clusterId;
122118

123119
private volatile Configuration conf;
@@ -725,10 +721,6 @@ public void releaseSyncReplicationPeerLock() {
725721
syncReplicationPeerLock.release();
726722
}
727723

728-
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
729-
return replicationLogCleanerBarrier;
730-
}
731-
732724
@Override
733725
public void onConfigurationChange(Configuration conf) {
734726
this.conf = conf;

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
6565
// queue for a given peer, that why we can use a String peerId as key instead of
6666
// ReplicationQueueId.
6767
private Map<ServerName, Map<String, Map<String, ReplicationGroupOffset>>> replicationOffsets;
68+
private ReplicationLogCleanerBarrier barrier;
6869
private ReplicationPeerManager rpm;
6970
private Supplier<Set<ServerName>> getNotFullyDeadServers;
7071

@@ -84,7 +85,7 @@ public void preClean() {
8485
LOG.error("Error occurred while executing queueStorage.hasData()", e);
8586
return;
8687
}
87-
canFilter = rpm.getReplicationLogCleanerBarrier().start();
88+
canFilter = barrier.start();
8889
if (canFilter) {
8990
notFullyDeadServers = getNotFullyDeadServers.get();
9091
peerIds = rpm.listPeers(null).stream().map(ReplicationPeerDescription::getPeerId)
@@ -98,7 +99,7 @@ public void preClean() {
9899
allQueueData = rpm.getQueueStorage().listAllQueues();
99100
} catch (ReplicationException e) {
100101
LOG.error("Can not list all replication queues, give up cleaning", e);
101-
rpm.getReplicationLogCleanerBarrier().stop();
102+
barrier.stop();
102103
canFilter = false;
103104
notFullyDeadServers = null;
104105
peerIds = null;
@@ -122,7 +123,7 @@ public void preClean() {
122123
@Override
123124
public void postClean() {
124125
if (canFilter) {
125-
rpm.getReplicationLogCleanerBarrier().stop();
126+
barrier.stop();
126127
canFilter = false;
127128
// release memory
128129
notFullyDeadServers = null;
@@ -244,6 +245,7 @@ public void init(Map<String, Object> params) {
244245
Object master = params.get(HMaster.MASTER);
245246
if (master != null && master instanceof MasterServices) {
246247
MasterServices m = (MasterServices) master;
248+
barrier = m.getReplicationLogCleanerBarrier();
247249
rpm = m.getReplicationPeerManager();
248250
getNotFullyDeadServers = () -> getNotFullyDeadServers(m);
249251
return;

hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
5757
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
5858
import org.apache.hadoop.hbase.replication.SyncReplicationState;
59+
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
5960
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
6061
import org.apache.hadoop.hbase.security.access.AccessChecker;
6162
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
@@ -524,4 +525,9 @@ public boolean replicationPeerModificationSwitch(boolean on) throws IOException
524525
public boolean isReplicationPeerModificationEnabled() {
525526
return false;
526527
}
528+
529+
@Override
530+
public ReplicationLogCleanerBarrier getReplicationLogCleanerBarrier() {
531+
return null;
532+
}
527533
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,11 @@ public void beforeTest() throws Exception {
132132

133133
masterServices = mock(MasterServices.class);
134134
when(masterServices.getConnection()).thenReturn(TEST_UTIL.getConnection());
135+
when(masterServices.getReplicationLogCleanerBarrier())
136+
.thenReturn(new ReplicationLogCleanerBarrier());
135137
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
136138
when(masterServices.getReplicationPeerManager()).thenReturn(rpm);
137139
when(rpm.getQueueStorage()).thenReturn(queueStorage);
138-
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
139140
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
140141
ServerManager sm = mock(ServerManager.class);
141142
when(masterServices.getServerManager()).thenReturn(sm);

hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestMigrateReplicationQueueFromZkToTableProcedure.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ public void testDisablePeerAndWaitStates() throws Exception {
214214
EXTRA_REGION_SERVERS
215215
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);
216216

217-
ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
218-
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
217+
ReplicationLogCleanerBarrier barrier =
218+
UTIL.getHBaseCluster().getMaster().getReplicationLogCleanerBarrier();
219219
assertTrue(barrier.start());
220220

221221
ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestReplicationLogCleaner.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ public class TestReplicationLogCleaner {
8080
@Before
8181
public void setUp() throws ReplicationException {
8282
services = mock(MasterServices.class);
83+
when(services.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
8384
ReplicationPeerManager rpm = mock(ReplicationPeerManager.class);
84-
when(rpm.getReplicationLogCleanerBarrier()).thenReturn(new ReplicationLogCleanerBarrier());
8585
when(services.getReplicationPeerManager()).thenReturn(rpm);
8686
when(rpm.listPeers(null)).thenReturn(new ArrayList<>());
8787
ReplicationQueueStorage rqs = mock(ReplicationQueueStorage.class);
@@ -157,7 +157,7 @@ public void testNoConf() {
157157

158158
@Test
159159
public void testCanNotFilter() {
160-
assertTrue(services.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable());
160+
assertTrue(services.getReplicationLogCleanerBarrier().disable());
161161
List<FileStatus> files = Arrays.asList(new FileStatus());
162162
assertSame(Collections.emptyList(), runCleaner(cleaner, files));
163163
}

0 commit comments

Comments
 (0)