Skip to content

Commit 02e0897

Browse files
committed
change code logic for BlockRescontructionWork
1 parent c33d868 commit 02e0897

File tree

7 files changed

+98
-79
lines changed

7 files changed

+98
-79
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
2121
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
2222
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
23+
import static org.apache.hadoop.hdfs.server.blockmanagement.LowRedundancyBlocks.LEVEL;
2324
import static org.apache.hadoop.util.ExitUtil.terminate;
2425
import static org.apache.hadoop.util.Time.now;
2526

@@ -869,7 +870,7 @@ public void metaSave(PrintWriter out) {
869870
synchronized (neededReconstruction) {
870871
out.println("Metasave: Blocks waiting for reconstruction: "
871872
+ neededReconstruction.getLowRedundancyBlockCount());
872-
for (int i = 0; i < neededReconstruction.LEVEL; i++) {
873+
for (int i = 0; i < LEVEL; i++) {
873874
if (i != neededReconstruction.QUEUE_WITH_CORRUPT_BLOCKS) {
874875
for (Iterator<BlockInfo> it = neededReconstruction.iterator(i);
875876
it.hasNext();) {
@@ -969,7 +970,7 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
969970
// source node returned is not used
970971
chooseSourceDatanodes(blockInfo, containingNodes,
971972
containingLiveReplicasNodes, numReplicas, new ArrayList<Byte>(),
972-
new ArrayList<Byte>(), new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
973+
new ArrayList<Byte>(), new ArrayList<Byte>(), LEVEL);
973974

974975
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
975976
// not included in the numReplicas.liveReplicas() count
@@ -2099,28 +2100,22 @@ int computeInvalidateWork(int nodesToProcess) {
20992100
* @return number of blocks scheduled for reconstruction during this
21002101
* iteration.
21012102
*/
2102-
int computeBlockReconstructionWork(int blocksToProcess) {
2103+
int scheduleBlockReconstructionWork(int blocksToProcess) {
21032104
List<List<BlockInfo>> blocksToReconstruct = null;
21042105
namesystem.writeLock();
2105-
try {
2106-
boolean reset = false;
2107-
if (replQueueResetToHeadThreshold > 0) {
2108-
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
2109-
reset = true;
2110-
replQueueCallsSinceReset = 0;
2111-
} else {
2112-
replQueueCallsSinceReset++;
2113-
}
2106+
boolean reset = false;
2107+
if (replQueueResetToHeadThreshold > 0) {
2108+
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
2109+
reset = true;
2110+
replQueueCallsSinceReset = 0;
2111+
} else {
2112+
replQueueCallsSinceReset++;
21142113
}
2115-
// Choose the blocks to be reconstructed
2116-
blocksToReconstruct = neededReconstruction
2117-
.chooseLowRedundancyBlocks(blocksToProcess, reset);
2118-
} finally {
2119-
namesystem.writeUnlock("computeBlockReconstructionWork");
21202114
}
2121-
return computeReconstructionWorkForBlocks(blocksToReconstruct);
2115+
return scheduleReconstructionWorkForBlocks(blocksToProcess, reset);
21222116
}
21232117

2118+
21242119
/**
21252120
* Reconstruct a set of blocks to full strength through replication or
21262121
* erasure coding
@@ -2129,29 +2124,39 @@ int computeBlockReconstructionWork(int blocksToProcess) {
21292124
* @return the number of blocks scheduled for replication
21302125
*/
21312126
@VisibleForTesting
2132-
int computeReconstructionWorkForBlocks(
2133-
List<List<BlockInfo>> blocksToReconstruct) {
2127+
int scheduleReconstructionWorkForBlocks(int blocksToProcess, boolean resetIterators) {
21342128
int scheduledWork = 0;
21352129
List<BlockReconstructionWork> reconWork = new ArrayList<>();
2136-
2137-
// Step 1: categorize at-risk blocks into replication and EC tasks
21382130
namesystem.writeLock();
2131+
int priority = 0;
2132+
// Step 1: categorize at-risk blocks into replication and EC tasks
21392133
try {
21402134
synchronized (neededReconstruction) {
2141-
for (int priority = 0; priority < blocksToReconstruct
2142-
.size(); priority++) {
2143-
for (BlockInfo block : blocksToReconstruct.get(priority)) {
2144-
BlockReconstructionWork rw = scheduleReconstruction(block,
2145-
priority);
2135+
for (; blocksToProcess > 0 && priority < LEVEL; priority++) {
2136+
List<BlockInfo> blocks = new ArrayList<>();
2137+
int processed = neededReconstruction.
2138+
chooseLowRedundancyBlocksForPriority(priority, blocksToProcess, blocks);
2139+
if(processed == 0)
2140+
break;
2141+
for (BlockInfo block : blocks) {
2142+
BlockReconstructionWork rw = generateReconstructionForBlock(block,
2143+
priority);
21462144
if (rw != null) {
21472145
reconWork.add(rw);
2146+
// if we constructed effective work, reduce the budget
2147+
blocksToProcess--;
21482148
}
21492149
}
21502150
}
21512151
}
21522152
} finally {
2153-
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
2153+
namesystem.writeUnlock("generateReconstructionWorkForBlocks");
21542154
}
2155+
if (priority == LEVEL || resetIterators) {
2156+
// Reset all bookmarks because there were no recently added blocks.
2157+
neededReconstruction.resetIterators();
2158+
}
2159+
21552160

21562161
// Step 2: choose target nodes for each reconstruction task
21572162
for (BlockReconstructionWork rw : reconWork) {
@@ -2161,7 +2166,7 @@ int computeReconstructionWorkForBlocks(
21612166

21622167
// Exclude all nodes which already exists as targets for the block
21632168
List<DatanodeStorageInfo> targets =
2164-
pendingReconstruction.getTargets(rw.getBlock());
2169+
pendingReconstruction.getTargets(rw.getBlock());
21652170
if (targets != null) {
21662171
for (DatanodeStorageInfo dn : targets) {
21672172
excludedNodes.add(dn.getDatanodeDescriptor());
@@ -2170,7 +2175,7 @@ int computeReconstructionWorkForBlocks(
21702175

21712176
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
21722177
final BlockPlacementPolicy placementPolicy =
2173-
placementPolicies.getPolicy(rw.getBlock().getBlockType());
2178+
placementPolicies.getPolicy(rw.getBlock().getBlockType());
21742179
rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
21752180
}
21762181

@@ -2191,7 +2196,7 @@ int computeReconstructionWorkForBlocks(
21912196
}
21922197
}
21932198
} finally {
2194-
namesystem.writeUnlock("computeReconstructionWorkForBlocks");
2199+
namesystem.writeUnlock("scheduleReconstructionWorkForBlocks");
21952200
}
21962201

21972202
if (blockLog.isDebugEnabled()) {
@@ -2204,16 +2209,17 @@ int computeReconstructionWorkForBlocks(
22042209
targetList.append(' ').append(target.getDatanodeDescriptor());
22052210
}
22062211
blockLog.debug("BLOCK* ask {} to replicate {} to {}",
2207-
rw.getSrcNodes(), rw.getBlock(), targetList);
2212+
rw.getSrcNodes(), rw.getBlock(), targetList);
22082213
}
22092214
}
22102215
blockLog.debug("BLOCK* neededReconstruction = {} pendingReconstruction = {}",
2211-
neededReconstruction.size(), pendingReconstruction.size());
2216+
neededReconstruction.size(), pendingReconstruction.size());
22122217
}
22132218

22142219
return scheduledWork;
22152220
}
22162221

2222+
22172223
// Check if the number of live + pending replicas satisfies
22182224
// the expected redundancy.
22192225
boolean hasEnoughEffectiveReplicas(BlockInfo block,
@@ -2225,7 +2231,7 @@ boolean hasEnoughEffectiveReplicas(BlockInfo block,
22252231
}
22262232

22272233
@VisibleForTesting
2228-
BlockReconstructionWork scheduleReconstruction(BlockInfo block,
2234+
BlockReconstructionWork generateReconstructionForBlock(BlockInfo block,
22292235
int priority) {
22302236
// skip abandoned block or block reopened for append
22312237
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
@@ -2615,7 +2621,7 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
26152621
}
26162622

26172623
// for EC here need to make sure the numReplicas replicates state correct
2618-
// because in the scheduleReconstruction it need the numReplicas to check
2624+
// because in the generateReconstructionForBlock it need the numReplicas to check
26192625
// whether need to reconstruct the ec internal block
26202626
byte blockIndex = -1;
26212627
if (isStriped) {
@@ -4954,7 +4960,7 @@ public void removeBlock(BlockInfo block) {
49544960
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
49554961
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
49564962
}
4957-
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
4963+
neededReconstruction.remove(block, LEVEL);
49584964
postponedMisreplicatedBlocks.remove(block);
49594965
}
49604966

@@ -5405,7 +5411,7 @@ int computeDatanodeWork() {
54055411
final int nodesToProcess = (int) Math.ceil(numlive
54065412
* this.blocksInvalidateWorkPct);
54075413

5408-
int workFound = this.computeBlockReconstructionWork(blocksToProcess);
5414+
int workFound = this.scheduleBlockReconstructionWork(blocksToProcess);
54095415

54105416
// Update counters
54115417
namesystem.writeLock();

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
/**
2929
* This class is used internally by
30-
* {@link BlockManager#computeReconstructionWorkForBlocks} to represent a
30+
* {@link BlockManager#scheduleReconstructionWorkForBlocks} to represent a
3131
* task to reconstruct a block through replication or erasure coding.
3232
* Reconstruction is done by transferring data from srcNodes to targets
3333
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -520,44 +520,57 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
520520

521521
int count = 0;
522522
int priority = 0;
523-
HashSet<BlockInfo> toRemove = new HashSet<>();
524523
for (; count < blocksToProcess && priority < LEVEL; priority++) {
525-
// Go through all blocks that need reconstructions with current priority.
526-
// Set the iterator to the first unprocessed block at this priority level
527-
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
528-
// to look for deleted blocks if any.
524+
List<BlockInfo> blocks = new ArrayList<>();
525+
int processed = chooseLowRedundancyBlocksForPriority(priority, blocksToProcess, blocks);
529526
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
530-
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
531-
final List<BlockInfo> blocks = new LinkedList<>();
532527
if (!inCorruptLevel) {
533528
blocksToReconstruct.add(blocks);
534529
}
535-
for(; count < blocksToProcess && i.hasNext(); count++) {
536-
BlockInfo block = i.next();
537-
if (block.isDeleted()) {
538-
toRemove.add(block);
539-
continue;
540-
}
541-
if (!inCorruptLevel) {
542-
blocks.add(block);
543-
}
544-
}
545-
for (BlockInfo bInfo : toRemove) {
546-
remove(bInfo, priority);
547-
}
548-
toRemove.clear();
530+
count += processed;
549531
}
550532

551533
if (priority == LEVEL || resetIterators) {
552534
// Reset all bookmarks because there were no recently added blocks.
553-
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
554-
q.resetBookmark();
555-
}
535+
resetIterators();
556536
}
557-
558537
return blocksToReconstruct;
559538
}
560539

540+
synchronized void resetIterators(){
541+
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
542+
q.resetBookmark();
543+
}
544+
}
545+
546+
synchronized int chooseLowRedundancyBlocksForPriority(
547+
int priority, int blocksToProcess, List<BlockInfo> blocks) {
548+
HashSet<BlockInfo> toRemove = new HashSet<>();
549+
int count = 0;
550+
// Go through all blocks that need reconstructions with current priority.
551+
// Set the iterator to the first unprocessed block at this priority level
552+
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
553+
// to look for deleted blocks if any.
554+
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
555+
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
556+
for(; count < blocksToProcess && i.hasNext(); count++) {
557+
BlockInfo block = i.next();
558+
if (block.isDeleted()) {
559+
toRemove.add(block);
560+
continue;
561+
}
562+
if (!inCorruptLevel) {
563+
blocks.add(block);
564+
}
565+
}
566+
for (BlockInfo bInfo : toRemove) {
567+
remove(bInfo, priority);
568+
}
569+
toRemove.clear();
570+
return count;
571+
}
572+
573+
561574
/** Returns an iterator of all blocks in a given priority queue. */
562575
synchronized Iterator<BlockInfo> iterator(int level) {
563576
return priorityQueues.get(level).iterator();

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ public static void checkRedundancy(final BlockManager blockManager) {
235235
*/
236236
public static int computeAllPendingWork(BlockManager bm) {
237237
int work = computeInvalidationWork(bm);
238-
work += bm.computeBlockReconstructionWork(Integer.MAX_VALUE);
238+
work += bm.scheduleBlockReconstructionWork(Integer.MAX_VALUE);
239239
return work;
240240
}
241241

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -643,8 +643,8 @@ private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
643643
assertEquals("Block not initially pending reconstruction", 0,
644644
bm.pendingReconstruction.getNumReplicas(block));
645645
assertEquals(
646-
"computeBlockReconstructionWork should indicate reconstruction is needed",
647-
1, bm.computeReconstructionWorkForBlocks(list_all));
646+
"scheduleBlockReconstructionWork should indicate reconstruction is needed",
647+
1, bm.scheduleReconstructionWorkForBlocks(list_all));
648648
assertTrue("reconstruction is pending after work is computed",
649649
bm.pendingReconstruction.getNumReplicas(block) > 0);
650650

@@ -897,7 +897,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
897897
aBlockInfoStriped.setBlockCollectionId(mockINodeId);
898898

899899
// reconstruction should be scheduled
900-
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
900+
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
901901
assertNotNull(work);
902902

903903
// simulate the 2 nodes reach maxReplicationStreams
@@ -907,7 +907,7 @@ public void testSkipReconstructionWithManyBusyNodes() {
907907
}
908908

909909
// reconstruction should be skipped since the number of non-busy nodes are not enough
910-
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
910+
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
911911
assertNull(work);
912912
}
913913

@@ -941,7 +941,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
941941
aBlockInfoStriped.setBlockCollectionId(mockINodeId);
942942

943943
// reconstruction should be scheduled
944-
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
944+
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
945945
assertNotNull(work);
946946

947947
// simulate the 1 node reaches maxReplicationStreams
@@ -950,7 +950,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
950950
}
951951

952952
// reconstruction should still be scheduled since there are 2 source nodes to create 2 blocks
953-
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
953+
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
954954
assertNotNull(work);
955955

956956
// simulate the 1 more node reaches maxReplicationStreams
@@ -959,7 +959,7 @@ public void testSkipReconstructionWithManyBusyNodes2() {
959959
}
960960

961961
// reconstruction should be skipped since the number of non-busy nodes are not enough
962-
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
962+
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
963963
assertNull(work);
964964
}
965965

@@ -995,7 +995,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
995995
aBlockInfoStriped.setBlockCollectionId(mockINodeId);
996996

997997
// Reconstruction should be scheduled.
998-
BlockReconstructionWork work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
998+
BlockReconstructionWork work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
999999
assertNotNull(work);
10001000

10011001
ExtendedBlock dummyBlock = new ExtendedBlock("bpid", 1, 1, 1);
@@ -1011,7 +1011,7 @@ public void testSkipReconstructionWithManyBusyNodes3() {
10111011
}
10121012

10131013
// Reconstruction should be skipped since the number of non-busy nodes are not enough.
1014-
work = bm.scheduleReconstruction(aBlockInfoStriped, 3);
1014+
work = bm.generateReconstructionForBlock(aBlockInfoStriped, 3);
10151015
assertNull(work);
10161016
}
10171017

@@ -2062,7 +2062,7 @@ public void testValidateReconstructionWorkAndRacksNotEnough() {
20622062
assertFalse(status.isPlacementPolicySatisfied());
20632063
DatanodeStorageInfo newNode = DFSTestUtil.createDatanodeStorageInfo(
20642064
"storage8", "8.8.8.8", "/rackA", "host8");
2065-
BlockReconstructionWork work = bm.scheduleReconstruction(blockInfo, 3);
2065+
BlockReconstructionWork work = bm.generateReconstructionForBlock(blockInfo, 3);
20662066
assertNotNull(work);
20672067
assertEquals(1, work.getAdditionalReplRequired());
20682068
// the new targets in rack A.

0 commit comments

Comments
 (0)