Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -721,11 +721,13 @@ message AssignReplicationQueuesStateData {
}

enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
}

message MigrateReplicationQueueFromZkToTableStateData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master.replication;

import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
Expand Down Expand Up @@ -111,6 +113,26 @@ private void shutdownExecutorService() {
}
}

private void disableReplicationLogCleaner(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
// it is not likely that we can reach here as we will schedule this procedure immediately
// after master restarting, where ReplicationLogCleaner should have not started its first run
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
// there will be no data in the new replication queue storage before we execute this procedure
// so ReplicationLogCleaner will quit immediately without doing anything.
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.info(
"Can not disable replication log cleaner, sleep {} secs and retry later",
backoff / 1000));
}
resetRetry();
}

private void enableReplicationLogCleaner(MasterProcedureEnv env) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
}

private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
long peerProcCount;
try {
Expand All @@ -136,6 +158,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
MigrateReplicationQueueFromZkToTableState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
disableReplicationLogCleaner(env);
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
waitUntilNoPeerProcedure(env);
List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
Expand All @@ -152,7 +178,8 @@ protected Flow executeFromState(MasterProcedureEnv env,
"failed to delete old replication queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
return Flow.NO_MORE_STATE;
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
return Flow.HAS_MORE_STATE;
}
// here we do not care the peers which have already been disabled, as later we do not need
// to enable them
Expand Down Expand Up @@ -232,6 +259,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
for (String peerId : disabledPeerIds) {
addChildProcedure(new EnablePeerProcedure(peerId));
}
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
enableReplicationLogCleaner(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
Expand Down Expand Up @@ -263,7 +294,19 @@ protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {

@Override
protected MigrateReplicationQueueFromZkToTableState getInitialState() {
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
}

@Override
protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) {
// do not need to disable log cleaner or acquire lock if we are in the initial state, later
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.master.replication;

import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -48,6 +51,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -102,6 +106,8 @@ public Map<ServerName, ServerMetrics> getOnlineServers() {

@BeforeClass
public static void setupCluster() throws Exception {
// one hour, to make sure it will not run during the test
UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
UTIL.startMiniCluster(
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
}
Expand Down Expand Up @@ -193,8 +199,10 @@ public void testWaitUntilNoPeerProcedure() throws Exception {
UTIL.waitFor(30000, () -> proc.isSuccess());
}

// make sure we will disable replication peers while migrating
// and also tests disable/enable replication log cleaner and wait for region server upgrading
@Test
public void testDisablePeerAndWaitUpgrading() throws Exception {
public void testDisablePeerAndWaitStates() throws Exception {
String peerId = "2";
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
Expand All @@ -206,21 +214,40 @@ public void testDisablePeerAndWaitUpgrading() throws Exception {
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);

ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
assertTrue(barrier.start());

ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();

MigrateReplicationQueueFromZkToTableProcedure proc =
new MigrateReplicationQueueFromZkToTableProcedure();
procExec.submitProcedure(proc);

Thread.sleep(5000);
// make sure we are still waiting for replication log cleaner quit
assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
proc.getCurrentStateId());
barrier.stop();

// wait until we reach the wait upgrading state
UTIL.waitFor(30000,
() -> proc.getCurrentStateId()
== MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
&& proc.getState() == ProcedureState.WAITING_TIMEOUT);
// make sure the peer is disabled for migrating
assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
// make sure the replication log cleaner is disabled
assertFalse(barrier.start());

// the procedure should finish successfully
EXTRA_REGION_SERVERS.clear();
UTIL.waitFor(30000, () -> proc.isSuccess());

// make sure the peer is enabled again
assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
// make sure the replication log cleaner is enabled again
assertTrue(barrier.start());
barrier.stop();
}
}