Skip to content

Commit 9a4389a

Browse files
committed
HBASE-27429 Add exponential retry backoff support for MigrateReplicationQueueFromZkToTableProcedure
Signed-off-by: Liangjun He <[email protected]>
1 parent 8ae42f9 commit 9a4389a

File tree

4 files changed

+125
-70
lines changed

4 files changed

+125
-70
lines changed

hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,13 @@ public void add(InlineChore chore) {
7878
}
7979

8080
public void add(Procedure<TEnvironment> procedure) {
81-
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
82-
procedure.getTimeoutTimestamp());
83-
queue.add(new DelayedProcedure<>(procedure));
81+
if (procedure.getTimeout() > 0) {
82+
LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, procedure.getTimeout(),
83+
procedure.getTimeoutTimestamp());
84+
queue.add(new DelayedProcedure<>(procedure));
85+
} else {
86+
LOG.info("Got negative timeout {} for {}, skip adding", procedure.getTimeout(), procedure);
87+
}
8488
}
8589

8690
public boolean remove(Procedure<TEnvironment> procedure) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/MigrateReplicationQueueFromZkToTableProcedure.java

Lines changed: 87 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,25 @@
2525

2626
import java.io.IOException;
2727
import java.util.List;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.ExecutorService;
2930
import java.util.concurrent.Executors;
30-
import java.util.concurrent.Future;
31+
import java.util.function.LongConsumer;
3132
import java.util.stream.Collectors;
33+
import org.apache.hadoop.conf.Configuration;
3234
import org.apache.hadoop.hbase.master.procedure.GlobalProcedureInterface;
3335
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
3436
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
3537
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
3638
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
39+
import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
3740
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
3841
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
3942
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
4043
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration;
44+
import org.apache.hadoop.hbase.util.FutureUtils;
45+
import org.apache.hadoop.hbase.util.IdLock;
46+
import org.apache.hadoop.hbase.util.RetryCounter;
4147
import org.apache.hadoop.hbase.util.VersionInfo;
4248
import org.apache.yetus.audience.InterfaceAudience;
4349
import org.apache.zookeeper.KeeperException;
@@ -65,18 +71,34 @@ public class MigrateReplicationQueueFromZkToTableProcedure
6571

6672
private List<String> disabledPeerIds;
6773

68-
private List<Future<?>> futures;
74+
private CompletableFuture<?> future;
6975

7076
private ExecutorService executor;
7177

78+
private RetryCounter retryCounter;
79+
7280
@Override
7381
public String getGlobalId() {
7482
return getClass().getSimpleName();
7583
}
7684

85+
private ProcedureSuspendedException suspend(Configuration conf, LongConsumer backoffConsumer)
86+
throws ProcedureSuspendedException {
87+
if (retryCounter == null) {
88+
retryCounter = ProcedureUtil.createRetryCounter(conf);
89+
}
90+
long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
91+
backoffConsumer.accept(backoff);
92+
throw suspend(Math.toIntExact(backoff), true);
93+
}
94+
95+
private void resetRetry() {
96+
retryCounter = null;
97+
}
98+
7799
private ExecutorService getExecutorService() {
78100
if (executor == null) {
79-
executor = Executors.newFixedThreadPool(3, new ThreadFactoryBuilder()
101+
executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
80102
.setNameFormat(getClass().getSimpleName() + "-%d").setDaemon(true).build());
81103
}
82104
return executor;
@@ -95,14 +117,17 @@ private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSu
95117
peerProcCount = env.getMasterServices().getProcedures().stream()
96118
.filter(p -> p instanceof PeerProcedureInterface).filter(p -> !p.isFinished()).count();
97119
} catch (IOException e) {
98-
LOG.warn("failed to check peer procedure status", e);
99-
throw suspend(5000, true);
120+
throw suspend(env.getMasterConfiguration(),
121+
backoff -> LOG.warn("failed to check peer procedure status, sleep {} secs and retry later",
122+
backoff / 1000, e));
100123
}
101124
if (peerProcCount > 0) {
102-
LOG.info("There are still {} pending peer procedures, will sleep and check later",
103-
peerProcCount);
104-
throw suspend(10_000, true);
125+
throw suspend(env.getMasterConfiguration(),
126+
backoff -> LOG.info(
127+
"There are still {} pending peer procedures, sleep {} secs and retry later",
128+
peerProcCount, backoff / 1000));
105129
}
130+
resetRetry();
106131
LOG.info("No pending peer procedures found, continue...");
107132
}
108133

@@ -122,8 +147,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
122147
try {
123148
oldStorage.deleteAllData();
124149
} catch (KeeperException e) {
125-
LOG.warn("failed to delete old replication queue data, sleep and retry later", e);
126-
suspend(10_000, true);
150+
throw suspend(env.getMasterConfiguration(),
151+
backoff -> LOG.warn(
152+
"failed to delete old replication queue data, sleep {} secs and retry later",
153+
backoff / 1000, e));
127154
}
128155
return Flow.NO_MORE_STATE;
129156
}
@@ -132,6 +159,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
132159
disabledPeerIds = peers.stream().filter(ReplicationPeerDescription::isEnabled)
133160
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
134161
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER);
162+
resetRetry();
135163
return Flow.HAS_MORE_STATE;
136164
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER:
137165
for (String peerId : disabledPeerIds) {
@@ -140,39 +168,52 @@ protected Flow executeFromState(MasterProcedureEnv env,
140168
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE);
141169
return Flow.HAS_MORE_STATE;
142170
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE:
143-
if (futures != null) {
144-
// wait until all futures done
145-
long notDone = futures.stream().filter(f -> !f.isDone()).count();
146-
if (notDone == 0) {
147-
boolean succ = true;
148-
for (Future<?> future : futures) {
149-
try {
150-
future.get();
151-
} catch (Exception e) {
152-
succ = false;
153-
LOG.warn("Failed to migrate", e);
154-
}
155-
}
156-
if (succ) {
157-
shutdownExecutorService();
158-
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
159-
return Flow.HAS_MORE_STATE;
160-
}
161-
// reschedule to retry migration again
162-
futures = null;
163-
} else {
164-
LOG.info("There still {} pending migration tasks, will sleep and check later", notDone);
165-
throw suspend(10_000, true);
171+
if (future != null) {
172+
// should have finished when we arrive here
173+
assert future.isDone();
174+
try {
175+
future.get();
176+
} catch (Exception e) {
177+
future = null;
178+
throw suspend(env.getMasterConfiguration(),
179+
backoff -> LOG.warn("failed to migrate queue data, sleep {} secs and retry later",
180+
backoff / 1000, e));
166181
}
182+
shutdownExecutorService();
183+
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING);
184+
resetRetry();
185+
return Flow.HAS_MORE_STATE;
167186
}
168-
try {
169-
futures = env.getReplicationPeerManager()
170-
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
171-
} catch (IOException e) {
172-
LOG.warn("failed to submit migration tasks", e);
173-
throw suspend(10_000, true);
174-
}
175-
throw suspend(10_000, true);
187+
future = env.getReplicationPeerManager()
188+
.migrateQueuesFromZk(env.getMasterServices().getZooKeeper(), getExecutorService());
189+
FutureUtils.addListener(future, (r, e) -> {
190+
// should acquire procedure execution lock to make sure that the procedure executor has
191+
// finished putting this procedure to the WAITING_TIMEOUT state, otherwise there could be
192+
// race and cause unexpected result
193+
IdLock procLock =
194+
env.getMasterServices().getMasterProcedureExecutor().getProcExecutionLock();
195+
IdLock.Entry lockEntry;
196+
try {
197+
lockEntry = procLock.getLockEntry(getProcId());
198+
} catch (IOException ioe) {
199+
LOG.error("Error while acquiring execution lock for procedure {}"
200+
+ " when trying to wake it up, aborting...", ioe);
201+
env.getMasterServices().abort("Can not acquire procedure execution lock", e);
202+
return;
203+
}
204+
try {
205+
setTimeoutFailure(env);
206+
} finally {
207+
procLock.releaseLockEntry(lockEntry);
208+
}
209+
});
210+
// here we set timeout to -1 so the ProcedureExecutor will not schedule a Timer for us
211+
setTimeout(-1);
212+
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
213+
// skip persistence is a must now since when restarting, if the procedure is in
214+
// WAITING_TIMEOUT state and has -1 as timeout, it will block there forever...
215+
skipPersistence();
216+
throw new ProcedureSuspendedException();
176217
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING:
177218
long rsWithLowerVersion =
178219
env.getMasterServices().getServerManager().getOnlineServers().values().stream()
@@ -181,9 +222,11 @@ protected Flow executeFromState(MasterProcedureEnv env,
181222
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER);
182223
return Flow.HAS_MORE_STATE;
183224
} else {
184-
LOG.info("There are still {} region servers which have a major version less than {}, "
185-
+ "will sleep and check later", rsWithLowerVersion, MIN_MAJOR_VERSION);
186-
throw suspend(10_000, true);
225+
throw suspend(env.getMasterConfiguration(),
226+
backoff -> LOG.warn(
227+
"There are still {} region servers which have a major version"
228+
+ " less than {}, sleep {} secs and check later",
229+
rsWithLowerVersion, MIN_MAJOR_VERSION, backoff / 1000));
187230
}
188231
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER:
189232
for (String peerId : disabledPeerIds) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,17 @@
2121
import java.io.IOException;
2222
import java.net.URI;
2323
import java.util.ArrayList;
24-
import java.util.Arrays;
2524
import java.util.Collection;
2625
import java.util.EnumSet;
2726
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
3029
import java.util.Optional;
3130
import java.util.Set;
31+
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ConcurrentMap;
3434
import java.util.concurrent.ExecutorService;
35-
import java.util.concurrent.Future;
3635
import java.util.concurrent.Semaphore;
3736
import java.util.concurrent.TimeUnit;
3837
import java.util.regex.Pattern;
@@ -70,6 +69,7 @@
7069
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkLastPushedSeqId;
7170
import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorageForMigration.ZkReplicationQueueData;
7271
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
72+
import org.apache.hadoop.hbase.util.FutureUtils;
7373
import org.apache.hadoop.hbase.util.Pair;
7474
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
7575
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -778,25 +778,38 @@ private void migrateHFileRefs(ZKReplicationQueueStorageForMigration oldQueueStor
778778
}
779779
}
780780

781+
private interface ExceptionalRunnable {
782+
void run() throws Exception;
783+
}
784+
785+
private CompletableFuture<?> runAsync(ExceptionalRunnable task, ExecutorService executor) {
786+
CompletableFuture<?> future = new CompletableFuture<>();
787+
executor.execute(() -> {
788+
try {
789+
task.run();
790+
future.complete(null);
791+
} catch (Exception e) {
792+
future.completeExceptionally(e);
793+
}
794+
});
795+
return future;
796+
}
797+
781798
/**
782-
* Submit the migration tasks to the given {@code executor} and return the futures.
799+
* Submit the migration tasks to the given {@code executor}.
783800
*/
784-
List<Future<?>> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor)
785-
throws IOException {
801+
CompletableFuture<?> migrateQueuesFromZk(ZKWatcher zookeeper, ExecutorService executor) {
786802
// the replication queue table creation is asynchronous and will be triggered by addPeer, so
787803
// here we need to manually initialize it since we will not call addPeer.
788-
initializeQueueStorage();
804+
try {
805+
initializeQueueStorage();
806+
} catch (IOException e) {
807+
return FutureUtils.failedFuture(e);
808+
}
789809
ZKReplicationQueueStorageForMigration oldStorage =
790810
new ZKReplicationQueueStorageForMigration(zookeeper, conf);
791-
return Arrays.asList(executor.submit(() -> {
792-
migrateQueues(oldStorage);
793-
return null;
794-
}), executor.submit(() -> {
795-
migrateLastPushedSeqIds(oldStorage);
796-
return null;
797-
}), executor.submit(() -> {
798-
migrateHFileRefs(oldStorage);
799-
return null;
800-
}));
811+
return CompletableFuture.allOf(runAsync(() -> migrateQueues(oldStorage), executor),
812+
runAsync(() -> migrateLastPushedSeqIds(oldStorage), executor),
813+
runAsync(() -> migrateHFileRefs(oldStorage), executor));
801814
}
802815
}

hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestReplicationPeerManagerMigrateQueuesFromZk.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.concurrent.ConcurrentMap;
3535
import java.util.concurrent.ExecutorService;
3636
import java.util.concurrent.Executors;
37-
import java.util.concurrent.Future;
3837
import java.util.concurrent.TimeUnit;
3938
import org.apache.hadoop.conf.Configuration;
4039
import org.apache.hadoop.hbase.Cell;
@@ -146,9 +145,7 @@ private Map<String, Set<String>> prepareData() throws Exception {
146145
@Test
147146
public void testNoPeers() throws Exception {
148147
prepareData();
149-
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
150-
future.get(1, TimeUnit.MINUTES);
151-
}
148+
manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
152149
// should have called initializer
153150
verify(queueStorageInitializer).initialize();
154151
// should have not migrated any data since there is no peer
@@ -165,9 +162,7 @@ public void testMigrate() throws Exception {
165162
// value is not used in this test, so just add a mock
166163
peers.put("peer_" + i, mock(ReplicationPeerDescription.class));
167164
}
168-
for (Future<?> future : manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR)) {
169-
future.get(1, TimeUnit.MINUTES);
170-
}
165+
manager.migrateQueuesFromZk(UTIL.getZooKeeperWatcher(), EXECUTOR).get(1, TimeUnit.MINUTES);
171166
// should have called initializer
172167
verify(queueStorageInitializer).initialize();
173168
List<ReplicationQueueData> queueDatas = queueStorage.listAllQueues();

0 commit comments

Comments
 (0)