Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@
public static final long DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT = 1000000;
public static final String DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_KEY = "dfs.namenode.checkpoint.max-retries";
public static final int DFS_NAMENODE_CHECKPOINT_MAX_RETRIES_DEFAULT = 3;
public static final String DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY =
"dfs.namenode.checkpoint.parallel.upload.enabled";
public static final boolean DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT = false;
public static final String DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_KEY = "dfs.namenode.missing.checkpoint.periods.before.shutdown";
public static final int DFS_NAMENODE_MISSING_CHECKPOINT_PERIODS_BEFORE_SHUTDOWN_DEFAULT = 3;
public static final String DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY =
Expand Down Expand Up @@ -1629,9 +1632,9 @@
"dfs.namenode.top.num.users";
public static final int NNTOP_NUM_USERS_DEFAULT = 10;
// comma separated list of nntop reporting periods in minutes
public static final String NNTOP_WINDOWS_MINUTES_KEY =

Check failure on line 1635 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java#L1635

javadoc: warning: no comment
"dfs.namenode.top.windows.minutes";
public static final String[] NNTOP_WINDOWS_MINUTES_DEFAULT = {"1", "5", "25"};

Check failure on line 1637 in hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java#L1637

javadoc: warning: no comment
public static final String DFS_PIPELINE_ECN_ENABLED = "dfs.pipeline.ecn";
public static final boolean DFS_PIPELINE_ECN_ENABLED_DEFAULT = false;
public static final String DFS_PIPELINE_SLOWNODE_ENABLED = "dfs.pipeline.slownode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public class CheckpointConf {
*/
private double quietMultiplier;

/**
* Whether enable the standby namenode to upload fsiamge to multiple other namenodes in
* parallel, in the cluster with observer namenodes.
*/
private final boolean parallelUploadEnabled;

public CheckpointConf(Configuration conf) {
checkpointCheckPeriod = conf.getTimeDuration(
DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY,
Expand All @@ -68,6 +74,9 @@ public CheckpointConf(Configuration conf) {
legacyOivImageDir = conf.get(DFS_NAMENODE_LEGACY_OIV_IMAGE_DIR_KEY);
quietMultiplier = conf.getDouble(DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_KEY,
DFS_NAMENODE_CHECKPOINT_QUIET_MULTIPLIER_DEFAULT);
parallelUploadEnabled = conf.getBoolean(
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY,
DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_DEFAULT);
warnForDeprecatedConfigs(conf);
}

Expand Down Expand Up @@ -106,4 +115,8 @@ public String getLegacyOivImageDir() {
public double getQuietPeriod() {
return this.checkpointPeriod * this.quietMultiplier;
}

public boolean isParallelUploadEnabled() {
return parallelUploadEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ private void doCheckpoint() throws InterruptedException, IOException {
// Do this in a separate thread to avoid blocking transition to active, but don't allow more
// than the expected number of tasks to run or queue up
// See HDFS-4816
ExecutorService executor = new ThreadPoolExecutor(0, activeNNAddresses.size(), 100,
int poolSize = checkpointConf.isParallelUploadEnabled() ? activeNNAddresses.size() : 0;
ExecutorService executor = new ThreadPoolExecutor(poolSize, activeNNAddresses.size(), 100,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(activeNNAddresses.size()),
uploadThreadFactory);
// for right now, just match the upload to the nn address by convention. There is no need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,17 @@
</description>
</property>

<property>
<name>dfs.namenode.checkpoint.parallel.upload.enabled</name>
<value>false</value>
<description>
If true, the CheckpointNode will upload the checkpoint image to multiple other
NameNodes in parallel, in the cluster with observer namenodes. You should
make sure the network bandwidth is sufficient.
If false, the fsimage will be uploaded serially to multiple namenodes.
</description>
</property>

<property>
<name>dfs.namenode.checkpoint.check.quiet-multiplier</name>
<value>1.5</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,67 @@ public Boolean get() {
// Assert that former active did not accept the canceled checkpoint file.
assertEquals(0, nns[0].getFSImage().getMostRecentCheckpointTxId());
}


/**
* Test standby namenode upload fsiamge to multiple other namenodes in parallel, in the
* cluster with observer namenodes.
*/
@Test
@Timeout(value = 300)
public void testCheckpointParallelUpload() throws Exception {
// Set dfs.namenode.checkpoint.txns differently on the first NN to avoid it
// doing checkpoint when it becomes a standby
cluster.getConfiguration(0).setInt(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 1000);

// don't compress, we want a big image
for (int i = 0; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setBoolean(
DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, false);
}

// Throttle SBN upload to make it hang during upload to ANN, and enable parallel upload fsimage.
for (int i = 1; i < NUM_NNS; i++) {
cluster.getConfiguration(i).setLong(
DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY, 100);
cluster.getConfiguration(i).setBoolean(
DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PARALLEL_UPLOAD_ENABLED_KEY, true);
}
for (int i = 0; i < NUM_NNS; i++) {
cluster.restartNameNode(i);
}

// update references to each of the nns
setNNs();

cluster.transitionToActive(0);

doEdits(0, 100);

for (int i = 1; i < NUM_NNS; i++) {
HATestUtil.waitForStandbyToCatchUp(nns[0], nns[i]);
HATestUtil.waitForCheckpoint(cluster, i, ImmutableList.of(104));
}
cluster.transitionToStandby(0);
cluster.transitionToActive(1);

GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
int transferThreadCount = 0;
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threads = threadBean.getThreadInfo(
threadBean.getAllThreadIds(), 1);
for (ThreadInfo thread: threads) {
if (thread.getThreadName().startsWith("TransferFsImageUpload")) {
transferThreadCount++;
}
}
return transferThreadCount == NUM_NNS - 1;
}
}, 1000, 30000);
}

/**
* Make sure that clients will receive StandbyExceptions even when a
* checkpoint is in progress on the SBN, and therefore the StandbyCheckpointer
Expand Down
Loading