Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -1956,6 +1957,26 @@ protected IOStreamPair connectToDN(DatanodeInfo dn, int timeout,
socketFactory, getConf().isConnectToDnViaHostname(), this, blockToken);
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
IOStreamPair pair =
DFSUtilClient.connectToDN(sourceDatanode, getConf().getSocketTimeout(), conf, saslClient,
socketFactory, getConf().isConnectToDnViaHostname(), this, sourceBlockToken);

new Sender((DataOutputStream) pair.out).copyBlockCrossNamespace(sourceBlk, sourceBlockToken,
targetBlk, targetBlockToken, targetDatanode);

pair.out.flush();

DataInputStream reply = new DataInputStream(pair.in);
BlockOpResponseProto proto = BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(reply));
DataTransferProtoUtil.checkBlockOpStatus(proto,
"copyBlockCrossNamespace " + sourceBlk + " to " + targetBlk + " from " + sourceDatanode
+ " to " + targetDatanode);
}

/**
* Infer the checksum type for a replica by sending an OP_READ_BLOCK
* for the first byte of that replica. This is used for compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class DFSOutputStream extends FSOutputSummer
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
private boolean leaseRecovered = false;
private ExtendedBlock userAssignmentLastBlock;

/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
Expand Down Expand Up @@ -949,6 +950,9 @@ protected void recoverLease(boolean recoverLeaseOnCloseException) {
void completeFile() throws IOException {
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
if (lastBlock == null) {
lastBlock = getUserAssignmentLastBlock();
}
try (TraceScope ignored = dfsClient.getTracer()
.newScope("DFSOutputStream#completeFile")) {
completeFile(lastBlock);
Expand Down Expand Up @@ -1095,6 +1099,14 @@ ExtendedBlock getBlock() {
return getStreamer().getBlock();
}

public ExtendedBlock getUserAssignmentLastBlock() {
return userAssignmentLastBlock;
}

public void setUserAssignmentLastBlock(ExtendedBlock userAssignmentLastBlock) {
this.userAssignmentLastBlock = userAssignmentLastBlock;
}

@VisibleForTesting
public long getFileId() {
return fileId;
Expand Down Expand Up @@ -1199,4 +1211,16 @@ private static long calculateDelayForNextRetry(long previousDelay,
long maxDelay) {
return Math.min(previousDelay * 2, maxDelay);
}

public DFSClient getDfsClient() {
return dfsClient;
}

protected void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, DatanodeInfo sourceDatanode,
ExtendedBlock targetBlk, Token<BlockTokenIdentifier> targetBlockToken,
DatanodeInfo targetDatanode) throws IOException {
dfsClient.copyBlockCrossNamespace(sourceBlk, sourceBlockToken, sourceDatanode, targetBlk,
targetBlockToken, targetDatanode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,9 @@ protected synchronized void closeImpl() throws IOException {

try (TraceScope ignored =
dfsClient.getTracer().newScope("completeFile")) {
if (currentBlockGroup == null) {
currentBlockGroup = getUserAssignmentLastBlock();
}
completeFile(currentBlockGroup);
}
logCorruptBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,10 @@ public FSDataOutputStream next(final FileSystem fs, final Path p)
* inherited policy.
*
*/
private HdfsDataOutputStream create(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
public HdfsDataOutputStream create(
final Path f, final FsPermission permission,
final EnumSet<CreateFlag> flag, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes, final String ecPolicyName,
final String storagePolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,18 @@ void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,
Token<BlockTokenIdentifier> blockToken,
long requestedNumBytes,
BlockChecksumOptions blockChecksumOptions) throws IOException;

/**
* Copy a block cross Namespace.
* It is used for fastcopy.
*
* @param sourceBlk the block being copied.
* @param sourceBlockToken security token for accessing sourceBlk.
* @param targetBlk the block to be writted.
* @param targetBlockToken security token for accessing targetBlk.
* @param targetDatanode the target datnode which sourceBlk will copy to as targetBlk.
*/
void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public enum Op {
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
BLOCK_GROUP_CHECKSUM((byte)90),
COPY_BLOCK_CROSSNAMESPACE((byte)91),
CUSTOM((byte)127);

/** The code for this operation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
Expand Down Expand Up @@ -308,4 +309,18 @@ public void blockGroupChecksum(StripedBlockInfo stripedBlockInfo,

send(out, Op.BLOCK_GROUP_CHECKSUM, proto);
}

@Override
public void copyBlockCrossNamespace(ExtendedBlock sourceBlk,
Token<BlockTokenIdentifier> sourceBlockToken, ExtendedBlock targetBlk,
Token<BlockTokenIdentifier> targetBlockToken, DatanodeInfo targetDatanode)
throws IOException {
OpCopyBlockCrossNamespaceProto proto = OpCopyBlockCrossNamespaceProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(sourceBlk, sourceBlockToken))
.setTargetBlock(PBHelperClient.convert(targetBlk))
.setTargetToken(PBHelperClient.convert(targetBlockToken))
.setTargetDatanode(PBHelperClient.convert(targetDatanode)).build();

send(out, Op.COPY_BLOCK_CROSSNAMESPACE, proto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,10 @@ message OpBlockChecksumResponseProto {
message OpCustomProto {
required string customId = 1;
}

message OpCopyBlockCrossNamespaceProto {
required BaseHeaderProto header = 1;
required ExtendedBlockProto targetBlock = 2;
required hadoop.common.TokenProto targetToken = 3;
required DatanodeInfoProto targetDatanode = 4;
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.ec.reconstruct.write.bandwidthPerSec";
public static final long DFS_DATANODE_EC_RECONSTRUCT_WRITE_BANDWIDTHPERSEC_DEFAULT =
0; // A value of zero indicates no limit
public static final String DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY =
"dfs.datanode.copy.block.cross.namespace.socket-timeout.ms";
public static final int DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT =
5 * 60 * 1000;

@Deprecated
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockGroupChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockCrossNamespaceProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
Expand Down Expand Up @@ -133,6 +134,9 @@ protected final void processOp(Op op) throws IOException {
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
case COPY_BLOCK_CROSSNAMESPACE:
opCopyBlockCrossNamespace(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
Expand Down Expand Up @@ -339,4 +343,21 @@ private void opStripedBlockChecksum(DataInputStream dis) throws IOException {
}
}
}

private void opCopyBlockCrossNamespace(DataInputStream dis) throws IOException {
OpCopyBlockCrossNamespaceProto proto =
OpCopyBlockCrossNamespaceProto.parseFrom(vintPrefixed(dis));
TraceScope traceScope = continueTraceSpan(proto.getHeader(), proto.getClass().getSimpleName());
try {
copyBlockCrossNamespace(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convert(proto.getHeader().getToken()),
PBHelperClient.convert(proto.getTargetBlock()),
PBHelperClient.convert(proto.getTargetToken()),
PBHelperClient.convert(proto.getTargetDatanode()));
} finally {
if (traceScope != null) {
traceScope.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
Expand Down Expand Up @@ -89,6 +91,7 @@ public class DNConf {
final int socketWriteTimeout;
final int socketKeepaliveTimeout;
final int ecChecksumSocketTimeout;
private final int copyBlockCrossNamespaceSocketTimeout;
private final int transferSocketSendBufferSize;
private final int transferSocketRecvBufferSize;
private final boolean tcpNoDelay;
Expand Down Expand Up @@ -153,6 +156,9 @@ public DNConf(final Configurable dn) {
ecChecksumSocketTimeout = getConf().getInt(
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_KEY,
DFS_CHECKSUM_EC_SOCKET_TIMEOUT_DEFAULT);
copyBlockCrossNamespaceSocketTimeout = getConf().getInt(
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_KEY,
DFS_DATANODE_COPY_BLOCK_CROSS_NAMESPACE_SOCKET_TIMEOUT_MS_DEFAULT);
this.transferSocketSendBufferSize = getConf().getInt(
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY,
DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT);
Expand Down Expand Up @@ -390,6 +396,15 @@ public int getEcChecksumSocketTimeout() {
return ecChecksumSocketTimeout;
}

/**
* Returns socket timeout for copyBlockCrossNamespace.
*
* @return int socket timeout
*/
public int getCopyBlockCrossNamespaceSocketTimeout() {
return copyBlockCrossNamespaceSocketTimeout;
}

/**
* Returns the SaslPropertiesResolver configured for use with
* DataTransferProtocol, or null if not configured.
Expand Down
Loading