diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index cbe7516b0ede0..f9ed1b3bf79b2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -1956,6 +1957,26 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout, socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken); } + protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, DatanodeInfo sourceDatanode, + ExtendedBlock targetBlk, Token targetBlockToken, + DatanodeInfo targetDatanode) throws IOException { + IOStreamPair pair = + DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient, + socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken); + + new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken, + targetBlk, targetBlockToken, targetDatanode); + + pair.out.flush(); + + DataInputStream reply = new DataInputStream(pair.in); + BlockOpResponseProto proto = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply)); + DataTransferProtoUtil.checkBlockOpStatus(proto, + "copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode + + " to " + targetDatanode); + } + /** * Infer the checksum type for a replica by sending an OP_READ_BLOCK * for the first byte of that replica. This is used for compatibility diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index a1bfb7f5d594e..f6bcfd67976ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer private FileEncryptionInfo fileEncryptionInfo; private int writePacketSize; private boolean leaseRecovered = false; + private ExtendedBlock userAssignmentLastBlock; /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/ protected DFSPacket createPacket(int packetSize, int chunksPerPkt, @@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) { void completeFile() throws IOException { // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); + if (lastBlock == null) { + lastBlock = getUserAssignmentLastBlock(); + } try (TraceScope ignored = dfsClient.getTracer() .newScope("DFSOutputStream#completeFile")) { completeFile(lastBlock); @@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() { return getStreamer().getBlock(); } + public ExtendedBlock getUserAssignmentLastBlock() { + return userAssignmentLastBlock; + } + + public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) { + this.userAssignmentLastBlock = userAssignmentLastBlock; + } + @VisibleForTesting public long getFileId() { return fileId; @@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay, long maxDelay) { return Math.min(previousDelay * 2, maxDelay); } + + public DFSClient getDfsClient() { + return dfsClient; + } + + protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, DatanodeInfo sourceDatanode, + ExtendedBlock targetBlk, Token targetBlockToken, + DatanodeInfo targetDatanode) throws IOException { + dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk, + targetBlockToken, targetDatanode); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 8320cc9a40866..543c00d45e40d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException { try (TraceScope ignored = dfsClient.getTracer().newScope("completeFile")) { + if (currentBlockGroup == null) { + currentBlockGroup = getUserAssignmentLastBlock(); + } completeFile(currentBlockGroup); } logCorruptBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 17c39f6c55b75..6182d852c35e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -600,9 +600,10 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) * inherited policy. * */ - private HdfsDataOutputStream create(final Path f, - final FsPermission permission, final EnumSet flag, - final int bufferSize, final short replication, final long blockSize, + public HdfsDataOutputStream create( + final Path f, final FsPermission permission, + final EnumSet flag, final int bufferSize, + final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt, final InetSocketAddress[] favoredNodes, final String ecPolicyName, final String storagePolicy) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 384f1dc3507af..8756c465ae806 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -238,4 +238,18 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, Token blockToken, long requestedNumBytes, BlockChecksumOptions blockChecksumOptions) throws IOException; + + /** + * Copy a block cross Namespace. + * It is used for fastcopy. + * + * @param sourceBlk the block being copied. + * @param sourceBlockToken security token for accessing sourceBlk. + * @param targetBlk the block to be writted. + * @param targetBlockToken security token for accessing targetBlk. + * @param targetDatanode the target datnode which sourceBlk will copy to as targetBlk. + */ + void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, DatanodeInfo targetDatanode) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 94250e5e7f622..1e9366dcb46f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -39,6 +39,7 @@ public enum Op { RELEASE_SHORT_CIRCUIT_FDS((byte)88), REQUEST_SHORT_CIRCUIT_SHM((byte)89), BLOCK_GROUP_CHECKSUM((byte)90), + COPY_BLOCK_CROSSNAMESPACE((byte)91), CUSTOM((byte)127); /** The code for this operation. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 86c6513745ea2..7b3329331c2c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -308,4 +309,18 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo, send(out, Op.BLOCK_GROUP_CHECKSUM, proto); } + + @Override + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, DatanodeInfo targetDatanode) + throws IOException { + OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder() + .setHeader(DataTransferProtoUtil.buildBaseHeader(sourceBlk, sourceBlockToken)) + .setTargetBlock(PBHelperClient.convert(targetBlk)) + .setTargetToken(PBHelperClient.convert(targetBlockToken)) + .setTargetDatanode(PBHelperClient.convert(targetDatanode)).build(); + + send(out, Op.COPY_BLOCK_CROSSNAMESPACE, proto); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 5356cd6961699..b9cc22ca3bb5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto { message OpCustomProto { required string customId = 1; } + +message OpCopyBlockCrossNamespaceProto { + required BaseHeaderProto header = 1; + required ExtendedBlockProto targetBlock = 2; + required hadoop.common.TokenProto targetToken = 3; + required DatanodeInfoProto targetDatanode = 4; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b9f8e07f67a5f..1119a25d3feb9 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.ec.reconstruct.write.bandwidthPerSec"; public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT = 0; // A value of zero indicates no limit + public static final String DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY = + "dfs.datanode.copy.block.cross.namespace.socket-timeout.ms"; + public static final int DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT = + 5 * 60 * 1000; + @Deprecated public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 8bcfb199ff5a9..6e149dd4c3cb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException { case REQUEST_SHORT_CIRCUIT_SHM: opRequestShortCircuitShm(in); break; + case COPY_BLOCK_CROSSNAMESPACE: + opCopyBlockCrossNamespace(in); + break; default: throw new IOException("Unknown op " + op + " in data stream"); } @@ -339,4 +343,21 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException { } } } + + private void opCopyBlockCrossNamespace(DataInputStream dis) throws IOException { + OpCopyBlockCrossNamespaceProto proto = + OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(dis)); + TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName()); + try { + copyBlockCrossNamespace(PBHelperClient.convert(proto.getHeader().getBlock()), + PBHelperClient.convert(proto.getHeader().getToken()), + PBHelperClient.convert(proto.getTargetBlock()), + PBHelperClient.convert(proto.getTargetToken()), + PBHelperClient.convert(proto.getTargetDatanode())); + } finally { + if (traceScope != null) { + traceScope.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 21b92db3073a1..736314bdc7fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -27,6 +27,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; @@ -89,6 +91,7 @@ public class DNConf { final int socketWriteTimeout; final int socketKeepaliveTimeout; final int ecChecksumSocketTimeout; + private final int copyBlockCrossNamespaceSocketTimeout; private final int transferSocketSendBufferSize; private final int transferSocketRecvBufferSize; private final boolean tcpNoDelay; @@ -153,6 +156,9 @@ public DNConf(final Configurable dn) { ecChecksumSocketTimeout = getConf().getInt( DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY, DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT); + copyBlockCrossNamespaceSocketTimeout = getConf().getInt( + DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY, + DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT); this.transferSocketSendBufferSize = getConf().getInt( DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); @@ -390,6 +396,15 @@ public int getEcChecksumSocketTimeout() { return ecChecksumSocketTimeout; } + /** + * Returns socket timeout for copyBlockCrossNamespace. + * + * @return int socket timeout + */ + public int getCopyBlockCrossNamespaceSocketTimeout() { + return copyBlockCrossNamespaceSocketTimeout; + } + /** * Returns the SaslPropertiesResolver configured for use with * DataTransferProtocol, or null if not configured. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 87e8eee681d1d..4161c3531e929 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -470,6 +470,7 @@ public static InetSocketAddress createSocketAddr(String target) { private DataSetLockManager dataSetLockManager; private final ExecutorService xferService; + private final ExecutorService copyBlockCrossNamespaceExecutor; @Nullable private final StorageLocationChecker storageLocationChecker; @@ -518,6 +519,8 @@ private static Tracer createTracer(Configuration conf) { volumeChecker = new DatasetVolumeChecker(conf, new Timer()); this.xferService = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); + this.copyBlockCrossNamespaceExecutor = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); double congestionRationTmp = conf.getDouble(DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO, DFSConfigKeys.DFS_PIPELINE_CONGESTION_RATIO_DEFAULT); this.congestionRatio = congestionRationTmp > 0 ? @@ -583,7 +586,8 @@ private static Tracer createTracer(Configuration conf) { "File descriptor passing was not configured."; LOG.debug(this.fileDescriptorPassingDisabledReason); } - + this.copyBlockCrossNamespaceExecutor = + HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); try { @@ -2593,9 +2597,12 @@ public void shutdown() { // wait reconfiguration thread, if any, to exit shutdownReconfigurationTask(); - LOG.info("Waiting up to 30 seconds for transfer threads to complete"); + LOG.info("Waiting up to 15 seconds for transfer threads to complete"); HadoopExecutors.shutdown(this.xferService, LOG, 15L, TimeUnit.SECONDS); + LOG.info("Waiting up to 15 seconds for copyBlockCrossNamespaceExecutor threads to complete"); + HadoopExecutors.shutdown(this.copyBlockCrossNamespaceExecutor, LOG, 15L, TimeUnit.SECONDS); + // wait for all data receiver threads to exit if (this.threadGroup != null) { int sleepMs = 2; @@ -3003,46 +3010,58 @@ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; final StorageType[] targetStorageTypes; final private String[] targetStorageIds; - final ExtendedBlock b; + final private ExtendedBlock source; + private ExtendedBlock target; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; final String clientname; final CachingStrategy cachingStrategy; - /** Throttle to block replication when data transfers or writes. */ + /** + * Throttle to block replication when data transfers or writes. + */ private DataTransferThrottler throttler; + private boolean copyBlockCrossNamespace; /** - * Connect to the first item in the target list. Pass along the + * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, - String[] targetStorageIds, ExtendedBlock b, - BlockConstructionStage stage, final String clientname) { - DataTransferProtocol.LOG.debug("{}: {} (numBytes={}), stage={}, " + - "clientname={}, targets={}, target storage types={}, " + - "target storage IDs={}", getClass().getSimpleName(), b, - b.getNumBytes(), stage, clientname, Arrays.asList(targets), - targetStorageTypes == null ? "[]" : - Arrays.asList(targetStorageTypes), + String[] targetStorageIds, ExtendedBlock source, BlockConstructionStage stage, + final String clientname) { + DataTransferProtocol.LOG.debug("{}: {} (numBytes={}), stage={}, " + + "clientname={}, targets={}, target storage types={}, " + "target storage IDs={}", + getClass().getSimpleName(), source, source.getNumBytes(), stage, clientname, + Arrays.asList(targets), + targetStorageTypes == null ? "[]" : Arrays.asList(targetStorageTypes), targetStorageIds == null ? "[]" : Arrays.asList(targetStorageIds)); this.targets = targets; this.targetStorageTypes = targetStorageTypes; this.targetStorageIds = targetStorageIds; - this.b = b; + this.source = source; + this.target = source; + this.copyBlockCrossNamespace = false; this.stage = stage; - BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); + BPOfferService bpos = blockPoolManager.get(source.getBlockPoolId()); bpReg = bpos.bpRegistration; this.clientname = clientname; - this.cachingStrategy = - new CachingStrategy(true, getDnConf().readaheadLength); + this.cachingStrategy = new CachingStrategy(true, getDnConf().readaheadLength); if (isTransfer(stage, clientname)) { this.throttler = xserver.getTransferThrottler(); - } else if(isWrite(stage)) { + } else if (isWrite(stage)) { this.throttler = xserver.getWriteThrottler(); } } + DataTransfer(DatanodeInfo[] targets, StorageType[] targetStorageTypes, + String[] targetStorageIds, ExtendedBlock source, ExtendedBlock target, + BlockConstructionStage stage, final String clientname) { + this(targets, targetStorageTypes, targetStorageIds, source, stage, clientname); + this.target = target; + this.copyBlockCrossNamespace = true; + } + /** * Do the deed, write the bytes */ @@ -3054,8 +3073,9 @@ public void run() { DataInputStream in = null; BlockSender blockSender = null; final boolean isClient = clientname.length() > 0; - + try { + DataNodeFaultInjector.get().transferThrowException(); final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr); LOG.debug("Connecting to datanode {}", dnAddr); @@ -3067,69 +3087,70 @@ public void run() { // // Header info // - Token accessToken = getBlockAccessToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - targetStorageTypes, targetStorageIds); + Token accessToken = + getBlockAccessToken(target, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), + targetStorageTypes, targetStorageIds); - long writeTimeout = dnConf.socketWriteTimeout + - HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); + long writeTimeout = + dnConf.socketWriteTimeout + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length + - 1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - DataEncryptionKeyFactory keyFactory = - getDataEncryptionKeyFactoryForBlock(b); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyFactory, accessToken, bpReg); + DataEncryptionKeyFactory keyFactory = getDataEncryptionKeyFactoryForBlock(source); + IOStreamPair saslStreams = + saslClient.socketSend(sock, unbufOut, unbufIn, keyFactory, accessToken, bpReg); unbufOut = saslStreams.out; unbufIn = saslStreams.in; - - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtilClient.getSmallBufferSize(getConf()))); + + out = new DataOutputStream( + new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(getConf()))); in = new DataInputStream(unbufIn); - blockSender = new BlockSender(b, 0, b.getNumBytes(), - false, false, true, DataNode.this, null, cachingStrategy); - DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg) - .build(); - - String storageId = targetStorageIds.length > 0 ? - targetStorageIds[0] : null; - new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, - clientname, targets, targetStorageTypes, srcNode, - stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy, - false, false, null, storageId, - targetStorageIds); + blockSender = + new BlockSender(source, 0, source.getNumBytes(), false, false, true, DataNode.this, + null, cachingStrategy); + DatanodeInfo srcNode = new DatanodeInfoBuilder().setNodeID(bpReg).build(); + + String storageId = targetStorageIds.length > 0 ? targetStorageIds[0] : null; + new Sender(out).writeBlock(target, targetStorageTypes[0], accessToken, clientname, targets, + targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), + cachingStrategy, false, false, null, storageId, targetStorageIds); // send data & checksum blockSender.sendBlock(out, unbufOut, throttler); // no response necessary - LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", - getClass().getSimpleName(), DataNode.this.getDisplayName(), - b, b.getNumBytes(), curTarget); + LOG.info("{}, at {}: Transmitted {} (numBytes={}) to {}", getClass().getSimpleName(), + DataNode.this.getDisplayName(), source, source.getNumBytes(), curTarget); // read ack if (isClient) { - DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - PBHelperClient.vintPrefixed(in)); + DNTransferAckProto closeAck = + DNTransferAckProto.parseFrom(PBHelperClient.vintPrefixed(in)); LOG.debug("{}: close-ack={}", getClass().getSimpleName(), closeAck); if (closeAck.getStatus() != Status.SUCCESS) { if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new InvalidBlockTokenException( - "Got access token error for connect ack, targets=" - + Arrays.asList(targets)); + "Got access token error for connect ack, targets=" + Arrays.asList(targets)); } else { - throw new IOException("Bad connect ack, targets=" - + Arrays.asList(targets) + " status=" + closeAck.getStatus()); + throw new IOException( + "Bad connect ack, targets=" + Arrays.asList(targets) + " status=" + + closeAck.getStatus()); } } } else { metrics.incrBlocksReplicated(); } } catch (IOException ie) { - handleBadBlock(b, ie, false); - LOG.warn("{}:Failed to transfer {} to {} got", - bpReg, b, targets[0], ie); + if (copyBlockCrossNamespace) { + throw new RuntimeException(ie); + } + handleBadBlock(source, ie, false); + LOG.warn("{}:Failed to transfer {} to {} got", bpReg, source, targets[0], ie); } catch (Throwable t) { - LOG.error("Failed to transfer block {}", b, t); + LOG.error("Failed to transfer block {}", source, t); + if (copyBlockCrossNamespace) { + throw new RuntimeException(t); + } } finally { decrementXmitsInProgress(); IOUtils.closeStream(blockSender); @@ -3141,7 +3162,7 @@ public void run() { @Override public String toString() { - return "DataTransfer " + b + " to " + Arrays.asList(targets); + return "DataTransfer " + source + " to " + Arrays.asList(targets); } } @@ -4388,4 +4409,94 @@ boolean isSlownode() { public BlockPoolManager getBlockPoolManager() { return blockPoolManager; } + + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targetBlk, + DatanodeInfo targetDn) throws IOException { + if (!data.isValidBlock(sourceBlk)) { + // block does not exist or is under-construction + String errStr = + "copyBlock:(" + this.getInfoPort() + ") Can't send invalid block " + sourceBlk + " " + + data.getReplicaString(sourceBlk.getBlockPoolId(), sourceBlk.getBlockId()); + LOG.info(errStr); + throw new IOException(errStr); + } + long onDiskLength = data.getLength(sourceBlk); + if (sourceBlk.getNumBytes() > onDiskLength) { + // Shorter on-disk len indicates corruption so report NN the corrupt block + String msg = "copyBlock: Can't replicate block " + sourceBlk + " because on-disk length " + + onDiskLength + " is shorter than provided length " + sourceBlk.getNumBytes(); + LOG.info(msg); + throw new IOException(msg); + } + LOG.info(getDatanodeInfo() + " copyBlock: Starting thread to transfer: " + "block:" + + sourceBlk + " from " + this.getDatanodeUuid() + " to " + targetDn.getDatanodeUuid() + + "(" + targetDn + ")"); + Future result; + if (this.getDatanodeUuid().equals(targetDn.getDatanodeUuid())) { + result = copyBlockCrossNamespaceExecutor.submit(new LocalBlockCopy(sourceBlk, targetBlk)); + } else { + result = copyBlockCrossNamespaceExecutor.submit( + new DataCopy(targetDn, sourceBlk, targetBlk).getDataTransfer()); + } + try { + result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + throw new IOException(e); + } + } + + private class DataCopy { + private final DataTransfer dataTransfer; + + DataCopy(DatanodeInfo targetDn, ExtendedBlock sourceBlk, ExtendedBlock targetBlk) { + FsVolumeImpl volume = (FsVolumeImpl) data.getVolume(sourceBlk); + StorageType storageType = volume.getStorageType(); + String storageId = volume.getStorageID(); + + DatanodeInfo[] targets = new DatanodeInfo[] {targetDn}; + StorageType[] targetStorageTypes = new StorageType[] {storageType}; + String[] targetStorageIds = new String[] {storageId}; + dataTransfer = + new DataTransfer(targets, targetStorageTypes, targetStorageIds, sourceBlk, targetBlk, + PIPELINE_SETUP_CREATE, ""); + } + + public DataTransfer getDataTransfer() { + return dataTransfer; + } + } + + class LocalBlockCopy implements Callable { + private ExtendedBlock sourceBlk = null; + private ExtendedBlock targetBlk = null; + + LocalBlockCopy(ExtendedBlock sourceBlk, ExtendedBlock targetBlk) { + this.sourceBlk = sourceBlk; + this.targetBlk = targetBlk; + } + + public Boolean call() throws IOException { + try { + targetBlk.setNumBytes(sourceBlk.getNumBytes()); + data.hardLinkOneBlock(sourceBlk, targetBlk); + FsVolumeSpi v = (FsVolumeSpi) (getFSDataset().getVolume(targetBlk)); + closeBlock(targetBlk, null, v.getStorageID(), v.isTransientStorage()); + + BlockLocalPathInfo srcBlpi = data.getBlockLocalPathInfo(sourceBlk); + BlockLocalPathInfo dstBlpi = data.getBlockLocalPathInfo(targetBlk); + LOG.info( + getClass().getSimpleName() + ": Hardlinked " + sourceBlk + "( " + srcBlpi.getBlockPath() + + " " + srcBlpi.getMetaPath() + " ) " + "to " + targetBlk + "( " + + dstBlpi.getBlockPath() + " " + dstBlpi.getMetaPath() + " ) "); + + metrics.incrBlocksReplicatedViaHardlink(); + } catch (IOException e) { + LOG.warn("Local block copy for src : " + sourceBlk.getBlockName() + ", dst : " + + targetBlk.getBlockName() + " failed", e); + throw e; + } + return true; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index 9e046cc3600df..9f54e0b44f698 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -173,9 +173,14 @@ public void delayDiffRecord() {} */ public void delayGetMetaDataInputStream() {} - /** + /** * Used in {@link DirectoryScanner#reconcile()} to wait until a storage is removed, * leaving a stale copy of {@link DirectoryScanner#diffs}. */ public void waitUntilStorageRemoved() {} + + /** + * Userd for datanode transfer thow exception case. + */ + public void transferThrowException() throws IOException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index d948c1caefd1f..b2aced4d69e88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -1088,6 +1088,35 @@ public void blockGroupChecksum(final StripedBlockInfo stripedBlockInfo, datanode.metrics.addBlockChecksumOp(elapsed()); } + @Override + public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, + Token sourceBlockToken, ExtendedBlock targetBlk, + Token targetBlockToken, + DatanodeInfo targetDatanode) + throws IOException { + updateCurrentThreadName("Copying block " + sourceBlk + " to " + targetBlk); + + DataOutputStream reply = getBufferedOutputStream(); + checkAccess(reply, true, sourceBlk, sourceBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, + BlockTokenIdentifier.AccessMode.READ); + checkAccess(reply, true, targetBlk, targetBlockToken, Op.COPY_BLOCK_CROSSNAMESPACE, + BlockTokenIdentifier.AccessMode.WRITE); + + try { + datanode.copyBlockCrossNamespace(sourceBlk, targetBlk, targetDatanode); + sendResponse(SUCCESS, null); + } catch (IOException ioe) { + LOG.warn("copyBlockCrossNamespace from {} to {} to {} received exception,", sourceBlk, + targetBlk, targetDatanode, ioe); + incrDatanodeNetworkErrors(); + throw ioe; + } finally { + IOUtils.closeStream(reply); + } + + datanode.metrics.addCopyBlockCrossNamespaceOp(elapsed()); + } + @Override public void copyBlock(final ExtendedBlock block, final Token blockToken) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 06be54b37d96a..81a7567d7826a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -708,4 +708,16 @@ ReplicaInfo moveBlockAcrossVolumes(final ExtendedBlock block, * @param time the last time in milliseconds when the directory scanner successfully ran. */ default void setLastDirScannerFinishTime(long time) {} + + /** + * Copies over a block from a block file + * + * @param srcBlock + * the source block which needs to be copied + * @param dstBlock + * the destination block to which the srcBlock needs to be copied to + * @throws IOException + */ + void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) + throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 8c643e9e16ace..d679c0a7cb788 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.server.datanode.FSCachingGetSpaceUsed; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; @@ -139,7 +140,7 @@ public int compare(File f1, File f2) { private volatile GetSpaceUsed dfsUsage; /** - * Create a blook pool slice + * Create a blook pool slice. * @param bpid Block pool Id * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param bpDir directory corresponding to the BlockPool @@ -1152,4 +1153,14 @@ void setDeleteDuplicateReplicasForTests( this.deleteDuplicateReplicas = deleteDuplicateReplicasForTests; } + public File hardLinkOneBlock(File src, File srcMeta, Block dstBlock) throws IOException { + File dstMeta = new File(tmpDir, + DatanodeUtil.getMetaName(dstBlock.getBlockName(), dstBlock.getGenerationStamp())); + HardLink.createHardLink(srcMeta, dstMeta); + + File dstBlockFile = new File(tmpDir, dstBlock.getBlockName()); + HardLink.createHardLink(src, dstBlockFile); + + return dstBlockFile; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index eeec1bb728825..e7d080b6e6f45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.LocalReplica; +import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.Block; @@ -3863,5 +3864,32 @@ public void setLastDirScannerFinishTime(long time) { public long getPendingAsyncDeletions() { return asyncDiskService.countPendingDeletions(); } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) throws IOException { + BlockLocalPathInfo blpi = getBlockLocalPathInfo(srcBlock); + File src = new File(blpi.getBlockPath()); + File srcMeta = new File(blpi.getMetaPath()); + + FsVolumeImpl v = getVolume(srcBlock); + + if (v.getAvailable() < dstBlock.getNumBytes()) { + throw new DiskOutOfSpaceException("Insufficient space for hardlink block " + srcBlock); + } + + BlockPoolSlice dstBPS = v.getBlockPoolSlice(dstBlock.getBlockPoolId()); + File dstBlockFile = dstBPS.hardLinkOneBlock(src, srcMeta, dstBlock.getLocalBlock()); + + try (AutoCloseableLock lock = lockManager.writeLock(LockLevel.VOLUME, dstBlock.getBlockPoolId(), + v.getStorageID())) { + ReplicaInfo replicaInfo = + new LocalReplicaInPipeline(dstBlock.getBlockId(), dstBlock.getGenerationStamp(), v, + dstBlockFile.getParentFile(), dstBlock.getLocalBlock().getNumBytes()); + dstBlockFile = dstBPS.addFinalizedBlock(dstBlock.getLocalBlock(), replicaInfo); + replicaInfo = new FinalizedReplica(dstBlock.getLocalBlock(), getVolume(srcBlock), + dstBlockFile.getParentFile()); + volumeMap.add(dstBlock.getBlockPoolId(), replicaInfo); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 832a8029f7771..b391fb5b1284c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -66,6 +66,7 @@ public class DataNodeMetrics { @Metric MutableCounterLong blocksWritten; @Metric MutableCounterLong blocksRead; @Metric MutableCounterLong blocksReplicated; + @Metric private MutableCounterLong blocksReplicatedViaHardlink; @Metric MutableCounterLong blocksRemoved; @Metric MutableCounterLong blocksVerified; @Metric MutableCounterLong blockVerificationFailures; @@ -127,6 +128,7 @@ public class DataNodeMetrics { @Metric MutableRate writeBlockOp; @Metric MutableRate blockChecksumOp; @Metric MutableRate copyBlockOp; + @Metric private MutableRate copyBlockCrossNamespaceOp; @Metric MutableRate replaceBlockOp; @Metric MutableRate heartbeats; @Metric MutableRate heartbeatsTotal; @@ -351,6 +353,18 @@ public void incrBlocksReplicated() { blocksReplicated.incr(); } + public long getBlocksReplicated() { + return blocksReplicated.value(); + } + + public void incrBlocksReplicatedViaHardlink() { + blocksReplicatedViaHardlink.incr(); + } + + public long getBlocksReplicatedViaHardlink() { + return blocksReplicatedViaHardlink.value(); + } + public void incrBlocksWritten() { blocksWritten.incr(); } @@ -400,6 +414,10 @@ public void addCopyBlockOp(long latency) { copyBlockOp.add(latency); } + public void addCopyBlockCrossNamespaceOp(long latency) { + copyBlockCrossNamespaceOp.add(latency); + } + public void addBlockChecksumOp(long latency) { blockChecksumOp.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 94c3ea0cc9b0c..76853f5e414e2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4994,6 +4994,14 @@ + + dfs.datanode.copy.block.cross.namespace.socket-timeout.ms + 300000 + + Default timeout value in milliseconds for datanode copying block cross namespace. + + + dfs.ha.fencing.methods diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 98556c4fd15ff..403634942de60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -31,9 +33,22 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.EnumSet; import java.util.Random; - +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.slf4j.Logger; @@ -650,4 +665,136 @@ public void testReleaseVolumeRefIfExceptionThrown() cluster.shutdown(); } } + + @Test(timeout = 90000) + public void testCopyBlockCrossNamespace() + throws IOException, InterruptedException, TimeoutException { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024 * 1024); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)).build(); + try { + cluster.waitActive(); + ArrayList dataNodes = cluster.getDataNodes(); + + // Create one file with one block with one replica in Namespace0. + Path ns0Path = new Path("/testCopyBlockCrossNamespace_0.txt"); + DistributedFileSystem ns0FS = cluster.getFileSystem(0); + DFSTestUtil.createFile(ns0FS, ns0Path, 1024, (short) 2, 0); + DFSTestUtil.waitReplication(ns0FS, ns0Path, (short) 2); + HdfsFileStatus ns0FileStatus = (HdfsFileStatus) ns0FS.getFileStatus(ns0Path); + LocatedBlocks locatedBlocks = + ns0FS.getClient().getLocatedBlocks(ns0Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocks.getLocatedBlocks().size()); + assertTrue(locatedBlocks.isLastBlockComplete()); + LocatedBlock locatedBlockNS0 = locatedBlocks.get(0); + + DatanodeInfoWithStorage[] datanodeInfoWithStoragesNS0 = locatedBlockNS0.getLocations(); + assertEquals(2, datanodeInfoWithStoragesNS0.length); + + String ns0BlockLocation1 = datanodeInfoWithStoragesNS0[0].getHostName() + ":" + + datanodeInfoWithStoragesNS0[0].getXferPort(); + String ns0BlockLocation2 = datanodeInfoWithStoragesNS0[1].getHostName() + ":" + + datanodeInfoWithStoragesNS0[1].getXferPort(); + + String[] favoredNodes = new String[2]; + + favoredNodes[0] = ns0BlockLocation1; + for (DataNode dn : dataNodes) { + String dnInfo = dn.getDatanodeHostname() + ":" + dn.getXferPort(); + if (!dnInfo.equals(ns0BlockLocation1) && !dnInfo.equals(ns0BlockLocation2)) { + favoredNodes[1] = dnInfo; + } + } + + // Create one similar file with two replicas in Namespace1. + Path ns1Path = new Path("/testCopyBlockCrossNamespace_1.txt"); + DistributedFileSystem ns1FS = cluster.getFileSystem(1); + FSDataOutputStream stream = + ns1FS.create(ns1Path, ns0FileStatus.getPermission(), EnumSet.of(CreateFlag.CREATE), + ns1FS.getClient().getConf().getIoBufferSize(), ns0FileStatus.getReplication(), + ns0FileStatus.getBlockSize(), null, null, null, null, null); + DFSOutputStream outputStream = (DFSOutputStream) stream.getWrappedStream(); + + LocatedBlock locatedBlockNS1 = + DFSOutputStream.addBlock(null, outputStream.getDfsClient(), ns1Path.getName(), null, + outputStream.getFileId(), favoredNodes, null); + assertEquals(2, locatedBlockNS1.getLocations().length); + + // Align the datanode. + DatanodeInfoWithStorage[] datanodeInfoWithStoragesNS1 = locatedBlockNS1.getLocations(); + DatanodeInfoWithStorage sameDN = datanodeInfoWithStoragesNS0[0].getXferPort() + == datanodeInfoWithStoragesNS1[0].getXferPort() ? + datanodeInfoWithStoragesNS1[0] : + datanodeInfoWithStoragesNS1[1]; + DatanodeInfoWithStorage differentDN = datanodeInfoWithStoragesNS0[0].getXferPort() + == datanodeInfoWithStoragesNS1[0].getXferPort() ? + datanodeInfoWithStoragesNS1[1] : + datanodeInfoWithStoragesNS1[0]; + + // HardLink locatedBlockNS0 to locatedBlockNS1 on same datanode. + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[0], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), sameDN); + + // Test when transfer throw exception client can know it. + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + public void transferThrowException() throws IOException { + throw new IOException("Transfer failed for fastcopy."); + } + }); + boolean transferError = false; + try { + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[1], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), differentDN); + } catch (IOException e) { + transferError = true; + } + assertTrue(transferError); + + DataNodeFaultInjector.set(new DataNodeFaultInjector()); + // Transfer locatedBlockNS0 to locatedBlockNS1 on different datanode. + outputStream.copyBlockCrossNamespace(locatedBlockNS0.getBlock(), + locatedBlockNS0.getBlockToken(), datanodeInfoWithStoragesNS0[1], + locatedBlockNS1.getBlock(), locatedBlockNS1.getBlockToken(), differentDN); + + // Check Lease Holder. + RemoteIterator iterator = ns1FS.listOpenFiles(); + OpenFileEntry fileEntry = iterator.next(); + assertEquals(ns1Path.toUri().toString(), fileEntry.getFilePath()); + assertEquals(outputStream.getDfsClient().getClientName(), fileEntry.getClientName()); + + outputStream.setUserAssignmentLastBlock(locatedBlockNS1.getBlock()); + stream.close(); + + // Check Lease release. + iterator = ns1FS.listOpenFiles(); + assertFalse(iterator.hasNext()); + + long heartbeatInterval = + conf.getTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT, + TimeUnit.SECONDS, TimeUnit.MILLISECONDS); + Thread.sleep(heartbeatInterval * 2); + + // Do verification that the file in namespace1 should contain one block with two replicas. + LocatedBlocks locatedBlocksNS1 = ns1FS.getClient().getNamenode() + .getBlockLocations(ns1Path.toUri().getPath(), 0, Long.MAX_VALUE); + assertEquals(1, locatedBlocksNS1.getLocatedBlocks().size()); + assertEquals(2, locatedBlocksNS1.getLocatedBlocks().get(0).getLocations().length); + assertTrue(locatedBlocksNS1.isLastBlockComplete()); + + for (DataNode dataNode : dataNodes) { + if (dataNode.getXferPort() == datanodeInfoWithStoragesNS0[0].getXferPort()) { + assertEquals(1L, dataNode.getMetrics().getBlocksReplicatedViaHardlink()); + } else if (dataNode.getXferPort() == datanodeInfoWithStoragesNS0[1].getXferPort()) { + assertEquals(1L, dataNode.getMetrics().getBlocksReplicated()); + } + } + + } finally { + cluster.shutdown(); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 1ddc4e9602a7d..b3924efcb77f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1653,5 +1653,10 @@ public List getVolumeList() { public void setLastDirScannerFinishTime(long time) { throw new UnsupportedOperationException(); } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) { + throw new UnsupportedOperationException(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 24069fccdfa35..cc124781cde2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -493,4 +493,11 @@ public long getLastDirScannerFinishTime() { public long getPendingAsyncDeletions() { return 0; } + + @Override + public void hardLinkOneBlock(ExtendedBlock srcBlock, ExtendedBlock dstBlock) + throws IOException { + + } + }