Skip to content

Commit 5d88e72

Browse files
author
eddy.cao
committed
Add config to enable parallel upload fsimage to multiple other namenodes
1 parent 3fb45b7 commit 5d88e72

File tree

4 files changed

+80
-20
lines changed

4 files changed

+80
-20
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
259259
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
260260
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
261261
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
262+
public static final String DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY =
263+
"dfs.namenode.checkpoint.parallel.upload.enabled";
264+
public static final boolean DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
262265
public static final String DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY = "dfs.namenode.missing.checkpoint.periods.before.shutdown";
263266
public static final int DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
264267
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointConf.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public class CheckpointConf {
5454
*/
5555
private double quietMultiplier;
5656

57+
/**
58+
* Whether enable the standby namenode to upload fsiamge to multiple other namenodes in
59+
* parallel, in the cluster with observer namenodes.
60+
*/
61+
private final boolean parallelUploadEnabled;
62+
5763
public CheckpointConf(Configuration conf) {
5864
checkpointCheckPeriod = conf.getTimeDuration(
5965
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
@@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
6874
legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
6975
quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
7076
DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
77+
parallelUploadEnabled = conf.getBoolean(
78+
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
79+
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
7180
warnForDeprecatedConfigs(conf);
7281
}
7382

@@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
106115
public double getQuietPeriod() {
107116
return this.checkpointPeriod * this.quietMultiplier;
108117
}
118+
119+
public boolean isParallelUploadEnabled() {
120+
return parallelUploadEnabled;
121+
}
109122
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,10 @@ private void doCheckpoint() throws InterruptedException, IOException {
248248
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
249249
// than the expected number of tasks to run or queue up
250250
// See HDFS-4816
251-
ExecutorService executor =
252-
new ThreadPoolExecutor(activeNNAddresses.size(), activeNNAddresses.size(), 100,
253-
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
254-
uploadThreadFactory);
251+
int poolSize = checkpointConf.isParallelUploadEnabled() ? activeNNAddresses.size() : 0;
252+
ExecutorService executor = new ThreadPoolExecutor(poolSize, activeNNAddresses.size(), 100,
253+
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
254+
uploadThreadFactory);
255255
// for right now, just match the upload to the nn address by convention. There is no need to
256256
// directly tie them together by adding a pair class.
257257
HashMap<String, Future<TransferFsImage.TransferResult>> uploads =

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -458,21 +458,6 @@ public void testCheckpointCancellationDuringUpload() throws Exception {
458458
cluster.transitionToStandby(0);
459459
cluster.transitionToActive(1);
460460

461-
GenericTestUtils.waitFor(new Supplier<Boolean>() {
462-
@Override
463-
public Boolean get() {
464-
int transferThreadCount = 0;
465-
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
466-
ThreadInfo[] threads = threadBean.getThreadInfo(
467-
threadBean.getAllThreadIds(), 1);
468-
for (ThreadInfo thread: threads) {
469-
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
470-
transferThreadCount++;
471-
}
472-
}
473-
return transferThreadCount == NUM_NNS - 1;
474-
}
475-
}, 1000, 30000);
476461

477462
// Wait to make sure background TransferFsImageUpload thread was cancelled.
478463
// This needs to be done before the next test in the suite starts, so that a
@@ -498,7 +483,66 @@ public Boolean get() {
498483
// Assert that former active did not accept the canceled checkpoint file.
499484
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
500485
}
501-
486+
487+
/**
488+
* Test standby namenode upload fsiamge to multiple other namenodes in parallel, in the
489+
* cluster with observer namenodes.
490+
*/
491+
@Test(timeout=60000)
492+
public void testCheckpointParallelUpload() throws Exception {
493+
// Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
494+
// doing checkpoint when it becomes a standby
495+
cluster.getConfiguration(0).setInt(
496+
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);
497+
498+
// don't compress, we want a big image
499+
for (int i = 0; i < NUM_NNS; i++) {
500+
cluster.getConfiguration(i).setBoolean(
501+
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
502+
}
503+
504+
// Throttle SBN upload to make it hang during upload to ANN, and enable parallel upload fsimage.
505+
for (int i = 1; i < NUM_NNS; i++) {
506+
cluster.getConfiguration(i).setLong(
507+
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
508+
cluster.getConfiguration(i).setBoolean(
509+
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY, true);
510+
}
511+
for (int i = 0; i < NUM_NNS; i++) {
512+
cluster.restartNameNode(i);
513+
}
514+
515+
// update references to each of the nns
516+
setNNs();
517+
518+
cluster.transitionToActive(0);
519+
520+
doEdits(0, 100);
521+
522+
for (int i = 1; i < NUM_NNS; i++) {
523+
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
524+
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
525+
}
526+
cluster.transitionToStandby(0);
527+
cluster.transitionToActive(1);
528+
529+
GenericTestUtils.waitFor(new Supplier<Boolean>() {
530+
@Override
531+
public Boolean get() {
532+
int transferThreadCount = 0;
533+
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
534+
ThreadInfo[] threads = threadBean.getThreadInfo(
535+
threadBean.getAllThreadIds(), 1);
536+
for (ThreadInfo thread: threads) {
537+
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
538+
transferThreadCount++;
539+
}
540+
}
541+
return transferThreadCount == NUM_NNS - 1;
542+
}
543+
}, 1000, 30000);
544+
}
545+
502546
/**
503547
* Make sure that clients will receive StandbyExceptions even when a
504548
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer

0 commit comments

Comments
 (0)