diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 41845152514fe..2b5c03eac35a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2096,29 +2096,41 @@ int computeInvalidateWork(int nodesToProcess) { * The number of process blocks equals either twice the number of live * data-nodes or the number of low redundancy blocks whichever is less. * + * In the case the found low-redundancy blocks cannot be scheduled for reconstruction, we will + * immediately find more low-redundancy blocks to avoid the waste of this tick * @return number of blocks scheduled for reconstruction during this * iteration. */ int computeBlockReconstructionWork(int blocksToProcess) { - List> blocksToReconstruct = null; - namesystem.writeLock(); - try { - boolean reset = false; - if (replQueueResetToHeadThreshold > 0) { - if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) { - reset = true; - replQueueCallsSinceReset = 0; - } else { - replQueueCallsSinceReset++; + List> blocksToReconstruct = new ArrayList<>(LowRedundancyBlocks.LEVEL); + int remaining = blocksToProcess; + int maxAttemptInATick = 100; + int lowRedundancyBlocksCount = neededReconstruction.getLowRedundancyBlockCount(); + do{ + namesystem.writeLock(); + try { + boolean reset = false; + if (replQueueResetToHeadThreshold > 0) { + if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) { + reset = true; + replQueueCallsSinceReset = 0; + } else { + replQueueCallsSinceReset++; + } } - } // Choose the blocks to be reconstructed - blocksToReconstruct = neededReconstruction - .chooseLowRedundancyBlocks(blocksToProcess, reset); - } finally { - namesystem.writeUnlock("computeBlockReconstructionWork"); - } - return computeReconstructionWorkForBlocks(blocksToReconstruct); + // Some candidates may not be actually used to construct BlockReconstructionWork, + // in this case, we will try another round immediately to avoid waste the tick + lowRedundancyBlocksCount -= neededReconstruction.chooseLowRedundancyBlocks(remaining, reset, blocksToReconstruct); + } finally { + namesystem.writeUnlock("computeBlockReconstructionWork"); + } + remaining -= computeReconstructionWorkForBlocks(blocksToReconstruct); + // in this case that many found blocks are skipped(src or target unavailable) for BlockReconstructionWork, + // find more blocks immediately for reconstruction in this tick + }while (remaining > 0 && lowRedundancyBlocksCount > 0 && --maxAttemptInATick > 0); + // return the actual effective number of BlockReconstructionWork + return blocksToProcess - remaining; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index d1c3b727e6f44..d2eb98936a52d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -517,7 +517,22 @@ synchronized List> chooseLowRedundancyBlocks( synchronized List> chooseLowRedundancyBlocks( int blocksToProcess, boolean resetIterators) { final List> blocksToReconstruct = new ArrayList<>(LEVEL); + chooseLowRedundancyBlocks(blocksToProcess, resetIterators, blocksToReconstruct); + return blocksToReconstruct; + } + /** + * @param blocksToProcess - number of blocks to fetch from low redundancy + * blocks. + * @param resetIterators - After gathering the list of blocks reset the + * position of all queue iterators to the head of the queue so + * subsequent calls will begin at the head of the queue + * @param blocksToReconstruct - The found candidates blocks need to be reconstructed + * @return Return the number of found low-redundancy blocks(They are put in blocksToReconstruct) + */ + synchronized int chooseLowRedundancyBlocks( + int blocksToProcess, boolean resetIterators, List> blocksToReconstruct) { + int foundBlocks = 0; int count = 0; int priority = 0; HashSet toRemove = new HashSet<>(); @@ -539,6 +554,7 @@ synchronized List> chooseLowRedundancyBlocks( continue; } if (!inCorruptLevel) { + foundBlocks++; blocks.add(block); } } @@ -555,7 +571,7 @@ synchronized List> chooseLowRedundancyBlocks( } } - return blocksToReconstruct; + return foundBlocks; } /** Returns an iterator of all blocks in a given priority queue. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index eeec1bb728825..882e15eb4f152 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -303,7 +303,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. - volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); + volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); // The value is 0 Collection dataLocations = DataNode.getStorageLocations(conf); List volumeFailureInfos = getInitialVolumeFailureInfos( @@ -312,7 +312,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) volsConfigured = datanode.getDnConf().getVolsConfigured(); int volsFailed = volumeFailureInfos.size(); - if (volFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT + if (volFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT // The value is -1 || volFailuresTolerated >= volsConfigured) { throw new HadoopIllegalArgumentException("Invalid value configured for " + "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index d9d236b66468d..5fd768442fe2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -2169,7 +2169,7 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception { cluster.setDataNodeDead(datanode.getDatanodeId()); assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock())); - // Wait for re-registration and heartbeat. + // Wait for re, -registration and heartbeat. datanode.setHeartbeatsDisabledForTests(false); final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0) .getBlockManager().getDatanodeManager() @@ -2329,4 +2329,49 @@ public void delayDeleteReplica() { DataNodeFaultInjector.set(oldInjector); } } + + @Test(timeout = 360000) + public void testReplicationWorkConstructionWhenMostSrcUnavailable() { + LOG.info("Starting testReplicationWorkConstructionWhenMostSrcUnavailable. "); + NetworkTopology clusterMap = bm.getDatanodeManager().getNetworkTopology(); + addNodes(nodes); + int blk_index = 0; + // We bind block_0 ~ block_9 on nodes[0] + for(; blk_index < 10 ;blk_index++){ + //Block block = new Block(i); + //BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 4); + //blockInfo.setBlockCollectionId(mockINodeId); + // We set it curReplicas to 1 to make its priority as QUEUE_WITH_CORRUPT_BLOCKS + + // These low redundancy blocks are all located on nodes[0] + addBlockOnNodes(blk_index, getNodes(0)); + assertTrue("Should add successfully to neededReconstruction", + bm.neededReconstruction.add(bm.getStoredBlock(new Block(blk_index)), + 1, + 0, + 0, + 3)); + } + // We bind block_10 on node[0] and node[1] + addBlockOnNodes(blk_index, getNodes(0,1)); + // The priority should be QUEUE_LOW_REDUNDANCY + assertTrue("Should add successfully to neededReconstruction", + bm.neededReconstruction.add(bm.getStoredBlock(new Block(blk_index)), + 1, + 0, + 0, + 3)); + + // simulate node[0] to reach maxReplicationStreams, so node[1] is able to work as source node for blk_10 Reconstruction + for(int i = 0; i < bm.getReplicationStreamsHardLimit(); i++){ + nodes.get(0).incrementPendingReplicationWithoutTargets(); + } + + assertEquals("There should exist 11 low-redundancy blocks", 11, bm.neededReconstruction.getLowRedundancyBlocks()); + + // We schedule reconstruction. the blk_0 ~ blk_9 cannot be scheduled because their source node reached ReplicationStreamsHardLimit, + // but computeBlockReconstructionWork() will move fast forward to schedule blk_10 inside this tick, instead of scheduling it in the next-next tick + int scheduledReconstruction = bm.computeBlockReconstructionWork(4); + assertEquals("The actual scheduled BlockReconstructionWork should include the blockAbleToReconstruct", 1, scheduledReconstruction); + } } \ No newline at end of file