Skip to content

Commit 0ecb34f

Browse files
authored
HDFS-16413. Reconfig dfs usage parameters for datanode (#3863) (#4125)
1 parent cfca024 commit 0ecb34f

File tree

10 files changed

+211
-5
lines changed

10 files changed

+211
-5
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CachingGetSpaceUsed.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.hadoop.classification.InterfaceAudience;
2121
import org.apache.hadoop.classification.InterfaceStability;
22+
import org.apache.hadoop.classification.VisibleForTesting;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -154,10 +155,20 @@ boolean running() {
154155
/**
155156
* How long in between runs of the background refresh.
156157
*/
157-
long getRefreshInterval() {
158+
@VisibleForTesting
159+
public long getRefreshInterval() {
158160
return refreshInterval;
159161
}
160162

163+
/**
164+
* Randomize the refresh interval timing by this amount, the actual interval will be chosen
165+
* uniformly between {@code interval-jitter} and {@code interval+jitter}.
166+
*/
167+
@VisibleForTesting
168+
public long getJitter() {
169+
return jitter;
170+
}
171+
161172
/**
162173
* Reset the current used data amount. This should be called
163174
* when the cached value is re-computed.

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
2222
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
23+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
24+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
25+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
26+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
2327
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
2428
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
2529
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
@@ -144,6 +148,8 @@
144148
import org.apache.hadoop.hdfs.HdfsConfiguration;
145149
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
146150
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
151+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
152+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
147153
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
148154
import org.apache.hadoop.util.AutoCloseableLock;
149155
import org.apache.hadoop.hdfs.client.BlockReportOptions;
@@ -344,7 +350,9 @@ public class DataNode extends ReconfigurableBase
344350
DFS_DATANODE_FILEIO_PROFILING_SAMPLING_PERCENTAGE_KEY,
345351
DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY,
346352
DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY,
347-
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY));
353+
DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY,
354+
FS_DU_INTERVAL_KEY,
355+
FS_GETSPACEUSED_JITTER_KEY));
348356

349357
public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
350358

@@ -668,6 +676,9 @@ public String reconfigurePropertyImpl(String property, String newVal)
668676
case DFS_DATANODE_MIN_OUTLIER_DETECTION_DISKS_KEY:
669677
case DFS_DATANODE_SLOWDISK_LOW_THRESHOLD_MS_KEY:
670678
return reconfSlowDiskParameters(property, newVal);
679+
case FS_DU_INTERVAL_KEY:
680+
case FS_GETSPACEUSED_JITTER_KEY:
681+
return reconfDfsUsageParameters(property, newVal);
671682
default:
672683
break;
673684
}
@@ -849,6 +860,43 @@ private String reconfSlowDiskParameters(String property, String newVal)
849860
}
850861
}
851862

863+
private String reconfDfsUsageParameters(String property, String newVal)
864+
throws ReconfigurationException {
865+
String result = null;
866+
try {
867+
LOG.info("Reconfiguring {} to {}", property, newVal);
868+
if (property.equals(FS_DU_INTERVAL_KEY)) {
869+
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
870+
long interval = (newVal == null ? FS_DU_INTERVAL_DEFAULT :
871+
Long.parseLong(newVal));
872+
result = Long.toString(interval);
873+
List<FsVolumeImpl> volumeList = data.getVolumeList();
874+
for (FsVolumeImpl fsVolume : volumeList) {
875+
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
876+
for (BlockPoolSlice value : blockPoolSlices.values()) {
877+
value.updateDfsUsageConfig(interval, null);
878+
}
879+
}
880+
} else if (property.equals(FS_GETSPACEUSED_JITTER_KEY)) {
881+
Preconditions.checkNotNull(data, "FsDatasetSpi has not been initialized.");
882+
long jitter = (newVal == null ? FS_GETSPACEUSED_JITTER_DEFAULT :
883+
Long.parseLong(newVal));
884+
result = Long.toString(jitter);
885+
List<FsVolumeImpl> volumeList = data.getVolumeList();
886+
for (FsVolumeImpl fsVolume : volumeList) {
887+
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
888+
for (BlockPoolSlice value : blockPoolSlices.values()) {
889+
value.updateDfsUsageConfig(null, jitter);
890+
}
891+
}
892+
}
893+
LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
894+
return result;
895+
} catch (IllegalArgumentException | IOException e) {
896+
throw new ReconfigurationException(property, newVal, getConf().get(property), e);
897+
}
898+
}
899+
852900
/**
853901
* Get a list of the keys of the re-configurable properties in configuration.
854902
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.hadoop.conf.Configuration;
3737
import org.apache.hadoop.fs.StorageType;
3838
import org.apache.hadoop.util.AutoCloseableLock;
39+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
3940
import org.apache.hadoop.hdfs.DFSConfigKeys;
4041
import org.apache.hadoop.hdfs.protocol.Block;
4142
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -679,4 +680,9 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block,
679680
* @throws IOException
680681
*/
681682
Set<? extends Replica> deepCopyReplica(String bpid) throws IOException;
683+
684+
/**
685+
* Get the volume list.
686+
*/
687+
List<FsVolumeImpl> getVolumeList();
682688
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.concurrent.TimeUnit;
4848

4949
import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed;
50+
import org.apache.hadoop.util.Preconditions;
5051
import org.slf4j.Logger;
5152
import org.slf4j.LoggerFactory;
5253
import org.apache.hadoop.conf.Configuration;
@@ -78,14 +79,17 @@
7879

7980
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
8081

82+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
83+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
84+
8185
/**
8286
* A block pool slice represents a portion of a block pool stored on a volume.
8387
* Taken together, all BlockPoolSlices sharing a block pool ID across a
8488
* cluster represent a single block pool.
8589
*
8690
* This class is synchronized by {@link FsVolumeImpl}.
8791
*/
88-
class BlockPoolSlice {
92+
public class BlockPoolSlice {
8993
static final Logger LOG = LoggerFactory.getLogger(BlockPoolSlice.class);
9094

9195
private final String bpid;
@@ -111,6 +115,8 @@ class BlockPoolSlice {
111115
private final Timer timer;
112116
private final int maxDataLength;
113117
private final FileIoProvider fileIoProvider;
118+
private final Configuration config;
119+
private final File bpDir;
114120

115121
private static ForkJoinPool addReplicaThreadPool = null;
116122
private static final int VOLUMES_REPLICA_ADD_THREADPOOL_SIZE = Runtime
@@ -124,7 +130,7 @@ public int compare(File f1, File f2) {
124130
};
125131

126132
// TODO:FEDERATION scalability issue - a thread per DU is needed
127-
private final GetSpaceUsed dfsUsage;
133+
private volatile GetSpaceUsed dfsUsage;
128134

129135
/**
130136
* Create a blook pool slice
@@ -137,6 +143,8 @@ public int compare(File f1, File f2) {
137143
*/
138144
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
139145
Configuration conf, Timer timer) throws IOException {
146+
this.config = conf;
147+
this.bpDir = bpDir;
140148
this.bpid = bpid;
141149
this.volume = volume;
142150
this.fileIoProvider = volume.getFileIoProvider();
@@ -228,6 +236,35 @@ public void run() {
228236
SHUTDOWN_HOOK_PRIORITY);
229237
}
230238

239+
public void updateDfsUsageConfig(Long interval, Long jitter) throws IOException {
240+
// Close the old dfsUsage if it is CachingGetSpaceUsed.
241+
if (dfsUsage instanceof CachingGetSpaceUsed) {
242+
((CachingGetSpaceUsed) dfsUsage).close();
243+
}
244+
if (interval != null) {
245+
Preconditions.checkArgument(interval > 0,
246+
FS_DU_INTERVAL_KEY + " should be larger than 0");
247+
config.setLong(FS_DU_INTERVAL_KEY, interval);
248+
}
249+
if (jitter != null) {
250+
Preconditions.checkArgument(jitter >= 0,
251+
FS_GETSPACEUSED_JITTER_KEY + " should be larger than or equal to 0");
252+
config.setLong(FS_GETSPACEUSED_JITTER_KEY, jitter);
253+
}
254+
// Start new dfsUsage.
255+
this.dfsUsage = new FSCachingGetSpaceUsed.Builder().setBpid(bpid)
256+
.setVolume(volume)
257+
.setPath(bpDir)
258+
.setConf(config)
259+
.setInitialUsed(loadDfsUsed())
260+
.build();
261+
}
262+
263+
@VisibleForTesting
264+
public GetSpaceUsed getDfsUsage() {
265+
return dfsUsage;
266+
}
267+
231268
private synchronized static void initializeAddReplicaPool(Configuration conf,
232269
FsDatasetImpl dataset) {
233270
if (addReplicaThreadPool == null) {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3431,5 +3431,10 @@ void stopAllDataxceiverThreads(FsVolumeImpl volume) {
34313431
}
34323432
}
34333433
}
3434+
3435+
@Override
3436+
public List<FsVolumeImpl> getVolumeList() {
3437+
return volumes.getVolumes();
3438+
}
34343439
}
34353440

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ long getRecentReserved() {
493493
return recentReserved;
494494
}
495495

496+
public Map<String, BlockPoolSlice> getBlockPoolSlices() {
497+
return bpSlices;
498+
}
499+
496500
long getReserved(){
497501
return reserved != null ? reserved.getReserved() : 0;
498502
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import javax.management.ObjectName;
4141
import javax.management.StandardMBean;
4242

43+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
4344
import org.apache.hadoop.thirdparty.com.google.common.math.LongMath;
4445
import org.apache.commons.lang3.ArrayUtils;
4546
import org.apache.hadoop.conf.Configuration;
@@ -1595,5 +1596,10 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
15951596
}
15961597
return Collections.unmodifiableSet(replicas);
15971598
}
1599+
1600+
@Override
1601+
public List<FsVolumeImpl> getVolumeList() {
1602+
return null;
1603+
}
15981604
}
15991605

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
package org.apache.hadoop.hdfs.server.datanode;
2020

2121
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
22+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_DEFAULT;
23+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DU_INTERVAL_KEY;
24+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_DEFAULT;
25+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_GETSPACEUSED_JITTER_KEY;
2226
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
2327
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
2428
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@@ -49,15 +53,21 @@
4953
import java.io.File;
5054
import java.io.IOException;
5155
import java.net.InetSocketAddress;
56+
import java.util.List;
57+
import java.util.Map;
5258

5359
import org.apache.hadoop.conf.Configuration;
5460
import org.apache.hadoop.conf.ReconfigurationException;
61+
import org.apache.hadoop.fs.CachingGetSpaceUsed;
5562
import org.apache.hadoop.fs.CommonConfigurationKeys;
5663
import org.apache.hadoop.fs.FileUtil;
64+
import org.apache.hadoop.fs.GetSpaceUsed;
5765
import org.apache.hadoop.hdfs.DFSConfigKeys;
5866
import org.apache.hadoop.hdfs.HdfsConfiguration;
5967
import org.apache.hadoop.hdfs.MiniDFSCluster;
6068
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
69+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
70+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
6171
import org.apache.hadoop.test.LambdaTestUtils;
6272
import org.junit.After;
6373
import org.junit.Assert;
@@ -673,4 +683,77 @@ public void testSlowDiskParameters() throws ReconfigurationException, IOExceptio
673683
dn.getDiskMetrics().getSlowDiskDetector().getLowThresholdMs());
674684
}
675685
}
686+
687+
@Test
688+
public void testDfsUsageParameters() throws ReconfigurationException {
689+
String[] dfsUsageParameters = {
690+
FS_DU_INTERVAL_KEY,
691+
FS_GETSPACEUSED_JITTER_KEY};
692+
693+
for (int i = 0; i < NUM_DATA_NODE; i++) {
694+
DataNode dn = cluster.getDataNodes().get(i);
695+
696+
// Try invalid values.
697+
for (String parameter : dfsUsageParameters) {
698+
try {
699+
dn.reconfigureProperty(parameter, "text");
700+
fail("ReconfigurationException expected");
701+
} catch (ReconfigurationException expected) {
702+
assertTrue("expecting NumberFormatException",
703+
expected.getCause() instanceof NumberFormatException);
704+
}
705+
706+
try {
707+
dn.reconfigureProperty(parameter, String.valueOf(-1));
708+
fail("ReconfigurationException expected");
709+
} catch (ReconfigurationException expected) {
710+
assertTrue("expecting IllegalArgumentException",
711+
expected.getCause() instanceof IllegalArgumentException);
712+
}
713+
}
714+
715+
// Change and verify properties.
716+
for (String parameter : dfsUsageParameters) {
717+
dn.reconfigureProperty(parameter, "99");
718+
}
719+
List<FsVolumeImpl> volumeList = dn.data.getVolumeList();
720+
for (FsVolumeImpl fsVolume : volumeList) {
721+
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
722+
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
723+
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
724+
if (dfsUsage instanceof CachingGetSpaceUsed) {
725+
assertEquals(99,
726+
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
727+
assertEquals(99,
728+
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
729+
}
730+
}
731+
}
732+
733+
// Revert to default and verify.
734+
for (String parameter : dfsUsageParameters) {
735+
dn.reconfigureProperty(parameter, null);
736+
}
737+
for (FsVolumeImpl fsVolume : volumeList) {
738+
Map<String, BlockPoolSlice> blockPoolSlices = fsVolume.getBlockPoolSlices();
739+
for (Map.Entry<String, BlockPoolSlice> entry : blockPoolSlices.entrySet()) {
740+
GetSpaceUsed dfsUsage = entry.getValue().getDfsUsage();
741+
if (dfsUsage instanceof CachingGetSpaceUsed) {
742+
assertEquals(String.format("expect %s is not configured",
743+
FS_DU_INTERVAL_KEY), FS_DU_INTERVAL_DEFAULT,
744+
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getRefreshInterval());
745+
assertEquals(String.format("expect %s is not configured",
746+
FS_GETSPACEUSED_JITTER_KEY), FS_GETSPACEUSED_JITTER_DEFAULT,
747+
((CachingGetSpaceUsed) entry.getValue().getDfsUsage()).getJitter());
748+
}
749+
assertEquals(String.format("expect %s is not configured",
750+
FS_DU_INTERVAL_KEY), null,
751+
dn.getConf().get(FS_DU_INTERVAL_KEY));
752+
assertEquals(String.format("expect %s is not configured",
753+
FS_GETSPACEUSED_JITTER_KEY), null,
754+
dn.getConf().get(FS_GETSPACEUSED_JITTER_KEY));
755+
}
756+
}
757+
}
758+
}
676759
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.hadoop.conf.Configuration;
2525
import org.apache.hadoop.fs.StorageType;
2626
import org.apache.hadoop.util.AutoCloseableLock;
27+
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
2728
import org.apache.hadoop.hdfs.protocol.Block;
2829
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
2930
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -465,4 +466,9 @@ public Set<? extends Replica> deepCopyReplica(String bpid)
465466
throws IOException {
466467
return Collections.EMPTY_SET;
467468
}
469+
470+
@Override
471+
public List<FsVolumeImpl> getVolumeList() {
472+
return null;
473+
}
468474
}

0 commit comments

Comments
 (0)