Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2096,29 +2096,41 @@ int computeInvalidateWork(int nodesToProcess) {
* The number of process blocks equals either twice the number of live
* data-nodes or the number of low redundancy blocks whichever is less.
*
* In the case the found low-redundancy blocks cannot be scheduled for reconstruction, we will
* immediately find more low-redundancy blocks to avoid the waste of this tick
* @return number of blocks scheduled for reconstruction during this
* iteration.
*/
int computeBlockReconstructionWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
try {
boolean reset = false;
if (replQueueResetToHeadThreshold > 0) {
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
reset = true;
replQueueCallsSinceReset = 0;
} else {
replQueueCallsSinceReset++;
List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LowRedundancyBlocks.LEVEL);
int remaining = blocksToProcess;
int maxAttemptInATick = 100;
int lowRedundancyBlocksCount = neededReconstruction.getLowRedundancyBlockCount();
do{
namesystem.writeLock();
try {
boolean reset = false;
if (replQueueResetToHeadThreshold > 0) {
if (replQueueCallsSinceReset >= replQueueResetToHeadThreshold) {
reset = true;
replQueueCallsSinceReset = 0;
} else {
replQueueCallsSinceReset++;
}
}
}
// Choose the blocks to be reconstructed
blocksToReconstruct = neededReconstruction
.chooseLowRedundancyBlocks(blocksToProcess, reset);
} finally {
namesystem.writeUnlock("computeBlockReconstructionWork");
}
return computeReconstructionWorkForBlocks(blocksToReconstruct);
// Some candidates may not be actually used to construct BlockReconstructionWork,
// in this case, we will try another round immediately to avoid waste the tick
lowRedundancyBlocksCount -= neededReconstruction.chooseLowRedundancyBlocks(remaining, reset, blocksToReconstruct);
} finally {
namesystem.writeUnlock("computeBlockReconstructionWork");
}
remaining -= computeReconstructionWorkForBlocks(blocksToReconstruct);
// in this case that many found blocks are skipped(src or target unavailable) for BlockReconstructionWork,
// find more blocks immediately for reconstruction in this tick
}while (remaining > 0 && lowRedundancyBlocksCount > 0 && --maxAttemptInATick > 0);
// return the actual effective number of BlockReconstructionWork
return blocksToProcess - remaining;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,22 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
int blocksToProcess, boolean resetIterators) {
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
chooseLowRedundancyBlocks(blocksToProcess, resetIterators, blocksToReconstruct);
return blocksToReconstruct;
}

/**
* @param blocksToProcess - number of blocks to fetch from low redundancy
* blocks.
* @param resetIterators - After gathering the list of blocks reset the
* position of all queue iterators to the head of the queue so
* subsequent calls will begin at the head of the queue
* @param blocksToReconstruct - The found candidates blocks need to be reconstructed
* @return Return the number of found low-redundancy blocks(They are put in blocksToReconstruct)
*/
synchronized int chooseLowRedundancyBlocks(
int blocksToProcess, boolean resetIterators, List<List<BlockInfo>> blocksToReconstruct) {
int foundBlocks = 0;
int count = 0;
int priority = 0;
HashSet<BlockInfo> toRemove = new HashSet<>();
Expand All @@ -539,6 +554,7 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
continue;
}
if (!inCorruptLevel) {
foundBlocks++;
blocks.add(block);
}
}
Expand All @@ -555,7 +571,7 @@ synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
}
}

return blocksToReconstruct;
return foundBlocks;
}

/** Returns an iterator of all blocks in a given priority queue. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)

// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); // The value is 0

Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
List<VolumeFailureInfo> volumeFailureInfos = getInitialVolumeFailureInfos(
Expand All @@ -312,7 +312,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
volsConfigured = datanode.getDnConf().getVolsConfigured();
int volsFailed = volumeFailureInfos.size();

if (volFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT
if (volFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT // The value is -1
|| volFailuresTolerated >= volsConfigured) {
throw new HadoopIllegalArgumentException("Invalid value configured for "
+ "dfs.datanode.failed.volumes.tolerated - " + volFailuresTolerated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2169,7 +2169,7 @@ public void testBlockReportSetNoAckBlockToInvalidate() throws Exception {
cluster.setDataNodeDead(datanode.getDatanodeId());
assertFalse(blockManager.containsInvalidateBlock(loc[0], lb.getBlock().getLocalBlock()));

// Wait for re-registration and heartbeat.
// Wait for re, -registration and heartbeat.
datanode.setHeartbeatsDisabledForTests(false);
final DatanodeDescriptor dn1Desc = cluster.getNamesystem(0)
.getBlockManager().getDatanodeManager()
Expand Down Expand Up @@ -2329,4 +2329,49 @@ public void delayDeleteReplica() {
DataNodeFaultInjector.set(oldInjector);
}
}

@Test(timeout = 360000)
public void testReplicationWorkConstructionWhenMostSrcUnavailable() {
LOG.info("Starting testReplicationWorkConstructionWhenMostSrcUnavailable. ");
NetworkTopology clusterMap = bm.getDatanodeManager().getNetworkTopology();
addNodes(nodes);
int blk_index = 0;
// We bind block_0 ~ block_9 on nodes[0]
for(; blk_index < 10 ;blk_index++){
//Block block = new Block(i);
//BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 4);
//blockInfo.setBlockCollectionId(mockINodeId);
// We set it curReplicas to 1 to make its priority as QUEUE_WITH_CORRUPT_BLOCKS

// These low redundancy blocks are all located on nodes[0]
addBlockOnNodes(blk_index, getNodes(0));
assertTrue("Should add successfully to neededReconstruction",
bm.neededReconstruction.add(bm.getStoredBlock(new Block(blk_index)),
1,
0,
0,
3));
}
// We bind block_10 on node[0] and node[1]
addBlockOnNodes(blk_index, getNodes(0,1));
// The priority should be QUEUE_LOW_REDUNDANCY
assertTrue("Should add successfully to neededReconstruction",
bm.neededReconstruction.add(bm.getStoredBlock(new Block(blk_index)),
1,
0,
0,
3));

// simulate node[0] to reach maxReplicationStreams, so node[1] is able to work as source node for blk_10 Reconstruction
for(int i = 0; i < bm.getReplicationStreamsHardLimit(); i++){
nodes.get(0).incrementPendingReplicationWithoutTargets();
}

assertEquals("There should exist 11 low-redundancy blocks", 11, bm.neededReconstruction.getLowRedundancyBlocks());

// We schedule reconstruction. the blk_0 ~ blk_9 cannot be scheduled because their source node reached ReplicationStreamsHardLimit,
// but computeBlockReconstructionWork() will move fast forward to schedule blk_10 inside this tick, instead of scheduling it in the next-next tick
int scheduledReconstruction = bm.computeBlockReconstructionWork(4);
assertEquals("The actual scheduled BlockReconstructionWork should include the blockAbleToReconstruct", 1, scheduledReconstruction);
}
}