diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java index f18c5a0c1eeb6..23e551a962fdb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/BasicInetPeer.java @@ -21,9 +21,12 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.http.client.utils.URIBuilder; /** * Represents a peer that we communicate with by using a basic Socket @@ -35,12 +38,18 @@ public class BasicInetPeer implements Peer { private final OutputStream out; private final InputStream in; private final boolean isLocal; + private final URI localURI; + private final URI remoteURI; public BasicInetPeer(Socket socket) throws IOException { this.socket = socket; this.out = socket.getOutputStream(); this.in = socket.getInputStream(); this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + this.localURI = getURI(socket.getLocalAddress().getHostAddress(), + socket.getLocalPort()); + this.remoteURI = + getURI(socket.getInetAddress().getHostAddress(), socket.getPort()); } @Override @@ -101,6 +110,16 @@ public String getLocalAddressString() { return socket.getLocalSocketAddress().toString(); } + @Override + public URI getRemoteURI() { + return this.remoteURI; + } + + @Override + public URI getLocalURI() { + return this.localURI; + } + @Override public InputStream getInputStream() throws IOException { return in; @@ -116,11 +135,6 @@ public boolean isLocal() { return isLocal; } - @Override - public String toString() { - return "BasicInetPeer(" + socket.toString() + ")"; - } - @Override public DomainSocket getDomainSocket() { return null; @@ -130,4 +144,29 @@ public DomainSocket getDomainSocket() { public boolean hasSecureChannel() { return false; } + + @Override + public String toString() { + return "BasicInetPeer [isLocal=" + isLocal + ", localURI=" + localURI + + ", remoteURI=" + remoteURI + "]"; + } + + /** + * Given a host name and port, create a DN URI. Turn checked exception into + * runtime. Exception should never happen because the inputs are captures from + * an exiting socket and not parsed from an external source. + * + * @param host Host for URI + * @param port Port for URI + * @return A URI + */ + private URI getURI(final String host, final int port) { + try { + return new URIBuilder().setScheme("hdfs+dn") + .setHost(host) + .setPort(port).build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java index 58c7e61b6d94c..66f3623b04b1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/DomainPeer.java @@ -20,10 +20,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.channels.ReadableByteChannel; -import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.net.unix.DomainSocket; /** * Represents a peer that we communicate with by using blocking I/O @@ -35,12 +37,21 @@ public class DomainPeer implements Peer { private final OutputStream out; private final InputStream in; private final ReadableByteChannel channel; + private final boolean isLocal; + private final URI localURI; + private final URI remoteURI; public DomainPeer(DomainSocket socket) { this.socket = socket; this.out = socket.getOutputStream(); this.in = socket.getInputStream(); this.channel = socket.getChannel(); + + // For a domain socket, both clients share the same socket file and only + // local communication is supported + this.isLocal = true; + this.localURI = getURI(socket.getPath()); + this.remoteURI = this.localURI; } @Override @@ -89,6 +100,16 @@ public String getLocalAddressString() { return ""; } + @Override + public URI getRemoteURI() { + return this.remoteURI; + } + + @Override + public URI getLocalURI() { + return this.localURI; + } + @Override public InputStream getInputStream() throws IOException { return in; @@ -101,13 +122,7 @@ public OutputStream getOutputStream() throws IOException { @Override public boolean isLocal() { - /* UNIX domain sockets can only be used for local communication. */ - return true; - } - - @Override - public String toString() { - return "DomainPeer(" + getRemoteAddressString() + ")"; + return this.isLocal; } @Override @@ -129,4 +144,31 @@ public boolean hasSecureChannel() { // return true; } + + @Override + public String toString() { + return "DomainPeer [isLocal=" + isLocal + ", localURI=" + localURI + + ", remoteURI=" + remoteURI + "]"; + } + + /** + * Given a host name and port, create a DN URI. Turn checked exception into + * runtime. Exception should never happen because the inputs are captures from + * an exiting socket and not parsed from an external source. + * + * Processes reference Unix domain sockets as file system inodes, so two + * processes can communicate by opening the same socket. Therefore the URI + * host is always "localhost" and the URI path is the file path to the socket + * file. + * + * @param path The path to the Unix domain socket file + * @return A URI + */ + private URI getURI(final String path) { + try { + return new URI("hdfs+dn+unix", "127.0.0.1", path, null); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java index 04816593efd34..68e705268e1c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/EncryptedPeer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.nio.channels.ReadableByteChannel; /** @@ -110,6 +111,16 @@ public String getLocalAddressString() { return enclosedPeer.getLocalAddressString(); } + @Override + public URI getRemoteURI() { + return enclosedPeer.getRemoteURI(); + } + + @Override + public URI getLocalURI() { + return enclosedPeer.getLocalURI(); + } + @Override public InputStream getInputStream() throws IOException { return in; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java index 23a45b7fe453b..3c342dcc52c14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/NioInetPeer.java @@ -21,11 +21,14 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.net.SocketInputStream; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.http.client.utils.URIBuilder; /** * Represents a peer that we communicate with by using non-blocking I/O @@ -46,11 +49,18 @@ public class NioInetPeer implements Peer { private final boolean isLocal; + private final URI localURI; + private final URI remoteURI; + public NioInetPeer(Socket socket) throws IOException { this.socket = socket; this.in = new SocketInputStream(socket.getChannel(), 0); this.out = new SocketOutputStream(socket.getChannel(), 0); this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress()); + this.localURI = getURI(socket.getLocalAddress().getHostAddress(), + socket.getLocalPort()); + this.remoteURI = + getURI(socket.getInetAddress().getHostAddress(), socket.getPort()); } @Override @@ -104,6 +114,16 @@ public String getLocalAddressString() { return socket.getLocalSocketAddress().toString(); } + @Override + public URI getRemoteURI() { + return this.remoteURI; + } + + @Override + public URI getLocalURI() { + return this.localURI; + } + @Override public InputStream getInputStream() throws IOException { return in; @@ -119,11 +139,6 @@ public boolean isLocal() { return isLocal; } - @Override - public String toString() { - return "NioInetPeer(" + socket.toString() + ")"; - } - @Override public DomainSocket getDomainSocket() { return null; @@ -133,4 +148,29 @@ public DomainSocket getDomainSocket() { public boolean hasSecureChannel() { return false; } + + @Override + public String toString() { + return "NioInetPeer [isLocal=" + isLocal + ", localURI=" + localURI + + ", remoteURI=" + remoteURI + "]"; + } + + /** + * Given a host name and port, create a DN URI. Turn checked exception into + * runtime. Exception should never happen because the inputs are captures from + * an exiting socket and not parsed from an external source. + * + * @param host Host for URI + * @param port Port for URI + * @return A URI + */ + private URI getURI(final String host, final int port) { + try { + return new URIBuilder().setScheme("hdfs+dn") + .setHost(host) + .setPort(port).build(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java index 8fecc6ef18062..1dd95105536db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/net/Peer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.net.unix.DomainSocket; @@ -31,93 +32,98 @@ @InterfaceAudience.Private public interface Peer extends Closeable { /** - * @return The input stream channel associated with this - * peer, or null if it has none. + * @return The input stream channel associated with this peer, or null if it + * has none. */ ReadableByteChannel getInputStreamChannel(); /** * Set the read timeout on this peer. * - * @param timeoutMs The timeout in milliseconds. + * @param timeoutMs The timeout in milliseconds. */ void setReadTimeout(int timeoutMs) throws IOException; /** - * @return The receive buffer size. + * @return The receive buffer size. */ int getReceiveBufferSize() throws IOException; /** - * @return True if TCP_NODELAY is turned on. + * @return True if TCP_NODELAY is turned on. */ boolean getTcpNoDelay() throws IOException; /** * Set the write timeout on this peer. * - * Note: this is not honored for BasicInetPeer. - * See {@link BasicInetPeer#setWriteTimeout} for details. + * Note: this is not honored for BasicInetPeer. See + * {@link BasicInetPeer#setWriteTimeout} for details. * - * @param timeoutMs The timeout in milliseconds. + * @param timeoutMs The timeout in milliseconds. */ void setWriteTimeout(int timeoutMs) throws IOException; /** - * @return true only if the peer is closed. + * @return true only if the peer is closed. */ boolean isClosed(); /** * Close the peer. * - * It's safe to re-close a Peer that is already closed. + * It is safe to close a Peer that is already closed. */ void close() throws IOException; /** - * @return A string representing the remote end of our - * connection to the peer. + * @return A string representing the remote end of our connection to the peer. */ String getRemoteAddressString(); /** - * @return A string representing the local end of our - * connection to the peer. + * @return A string representing the local end of our connection to the peer. */ String getLocalAddressString(); /** - * @return An InputStream associated with the Peer. - * This InputStream will be valid until you close - * this peer with Peer#close. + * @return A URI representing the remote end of our connection to the peer. + */ + URI getRemoteURI(); + + /** + * @return A URI representing the local end of our connection to the peer. + */ + URI getLocalURI(); + + /** + * @return An InputStream associated with the Peer. This InputStream will be + * valid until you close this peer with Peer#close. */ InputStream getInputStream() throws IOException; /** - * @return An OutputStream associated with the Peer. - * This OutputStream will be valid until you close - * this peer with Peer#close. + * @return An OutputStream associated with the Peer. This OutputStream will be + * valid until you close this peer with Peer#close. */ OutputStream getOutputStream() throws IOException; /** - * @return True if the peer resides on the same - * computer as we. + * @return True if the peer resides on the same computer as we. */ boolean isLocal(); /** - * @return The DomainSocket associated with the current - * peer, or null if there is none. + * @return The DomainSocket associated with the current peer, or null if there + * is none. */ DomainSocket getDomainSocket(); /** * Return true if the channel is secure. * - * @return True if our channel to this peer is not - * susceptible to man-in-the-middle attacks. + * @return True if our channel to this peer is not susceptible to + * man-in-the-middle attacks. */ boolean hasSecureChannel(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java index b24df2bfce929..3aeab66dc8f63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.nio.channels.ReadableByteChannel; import static org.junit.Assert.assertEquals; @@ -96,6 +97,16 @@ public String getLocalAddressString() { return "127.0.0.1:123"; } + @Override + public URI getRemoteURI() { + throw new UnsupportedOperationException(); + } + + @Override + public URI getLocalURI() { + throw new UnsupportedOperationException(); + } + @Override public InputStream getInputStream() throws IOException { throw new UnsupportedOperationException(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fedfc5a7dcefe..f234558401e3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -746,6 +746,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_MAX_RECEIVER_THREADS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY; public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096; + public static final String DFS_DATANODE_RECEIVER_THREADS_TTL = + "dfs.datanode.transfer.threads.ttl"; + public static final long DFS_DATANODE_RECEIVER_THREADS_TTL_DEFAULT = 60L; // seconds public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 21 * 24; // 3 weeks. public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 950918404303c..e9245acbf395f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -342,8 +342,7 @@ public void close() throws IOException { } } catch(IOException e) { ioe = e; - } - finally { + } finally { IOUtils.closeStream(checksumOut); } // close block file @@ -363,18 +362,22 @@ public void close() throws IOException { } } catch (IOException e) { ioe = e; - } - finally{ + } finally { streams.close(); } - if (replicaHandler != null) { - IOUtils.cleanup(null, replicaHandler); - replicaHandler = null; + IOUtils.closeStream(this.replicaHandler); + this.replicaHandler = null; + + if (this.replicaInfo != null) { + // Since this BlockReceiver is closing, it is no longer associated + // with this replica + this.replicaInfo.attemptToSetWriter(Thread.currentThread(), null); } + if (measuredFlushTime) { datanode.metrics.addFlushNanos(flushTotalNanos); } - if(ioe != null) { + if (ioe != null) { // Volume error check moved to FileIoProvider throw ioe; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 53d650d0b1cda..8108d89333631 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -84,6 +84,7 @@ import java.net.SocketTimeoutException; import java.nio.channels.ClosedChannelException; import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; @@ -100,25 +101,22 @@ /** * Thread for processing incoming/outgoing data stream. */ -class DataXceiver extends Receiver implements Runnable { - public static final Logger LOG = DataNode.LOG; - static final Log ClientTraceLog = DataNode.ClientTraceLog; - - private Peer peer; - private final String remoteAddress; // address of remote side - private final String remoteAddressWithoutPort; // only the address, no port - private final String localAddress; // local address of this daemon +class DataXceiver extends Receiver implements Callable { + private static final Logger LOG = DataNode.LOG; + + private static final Log ClientTraceLog = DataNode.ClientTraceLog; + private final DataNode datanode; private final DNConf dnConf; private final DataXceiverServer dataXceiverServer; private final boolean connectToDnViaHostname; - private long opStartTime; //the start time of receiving an Op - private final InputStream socketIn; - private OutputStream socketOut; - private BlockReceiver blockReceiver = null; private final int ioFileBufferSize; private final int smallBufferSize; - private Thread xceiver = null; + + private volatile BlockReceiver blockReceiver; + private volatile Peer peer; + private OutputStream socketOut; + private long opStartTime; /** * Client Name used in previous operation. Not available on first request @@ -136,21 +134,12 @@ private DataXceiver(Peer peer, DataNode datanode, super(datanode.getTracer()); this.peer = peer; this.dnConf = datanode.getDnConf(); - this.socketIn = peer.getInputStream(); - this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(datanode.getConf()); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(datanode.getConf()); - remoteAddress = peer.getRemoteAddressString(); - final int colonIdx = remoteAddress.indexOf(':'); - remoteAddressWithoutPort = - (colonIdx < 0) ? remoteAddress : remoteAddress.substring(0, colonIdx); - localAddress = peer.getLocalAddressString(); - - LOG.debug("Number of active connections is: {}", - datanode.getXceiverCount()); + this.blockReceiver = null; } /** @@ -159,12 +148,11 @@ private DataXceiver(Peer peer, DataNode datanode, * outside the constructor. */ private void updateCurrentThreadName(String status) { - StringBuilder sb = new StringBuilder(); - sb.append("DataXceiver for client "); + StringBuilder sb = new StringBuilder("DataXceiver for client "); if (previousOpClientName != null) { sb.append(previousOpClientName).append(" at "); } - sb.append(remoteAddress); + sb.append(this.peer.getRemoteAddressString()); if (status != null) { sb.append(" [").append(status).append("]"); } @@ -179,7 +167,7 @@ private OutputStream getOutputStream() { } public void sendOOB() throws IOException, InterruptedException { - BlockReceiver br = getCurrentBlockReceiver(); + BlockReceiver br = this.blockReceiver; if (br == null) { return; } @@ -190,65 +178,45 @@ public void sendOOB() throws IOException, InterruptedException { br.sendOOB(); } - public void stopWriter() { - // We want to interrupt the xceiver only when it is serving writes. - synchronized(this) { - if (getCurrentBlockReceiver() == null) { - return; - } - xceiver.interrupt(); - } - LOG.info("Stopped the writer: {}", peer); + public boolean hasBlockReceiver() { + return this.blockReceiver != null; } - /** - * blockReceiver is updated at multiple places. Use the synchronized setter - * and getter. - */ - private synchronized void setCurrentBlockReceiver(BlockReceiver br) { - blockReceiver = br; + private void setCurrentBlockReceiver(BlockReceiver br) { + this.blockReceiver = br; } - private synchronized BlockReceiver getCurrentBlockReceiver() { - return blockReceiver; - } - /** * Read/write data from/to the DataXceiverServer. */ @Override - public void run() { + public Void call() { int opsProcessed = 0; Op op = null; try { - synchronized(this) { - xceiver = Thread.currentThread(); - } - dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); - InputStream input = socketIn; + final InputStream input; try { - IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, - socketIn, datanode.getXferAddress().getPort(), - datanode.getDatanodeId()); - input = new BufferedInputStream(saslStreams.in, - smallBufferSize); + IOStreamPair saslStreams = datanode.saslServer.receive(peer, + this.peer.getOutputStream(), peer.getInputStream(), + datanode.getXferAddress().getPort(), datanode.getDatanodeId()); + input = new BufferedInputStream(saslStreams.in, smallBufferSize); socketOut = saslStreams.out; } catch (InvalidMagicNumberException imne) { if (imne.isHandshake4Encryption()) { LOG.info("Failed to read expected encryption handshake from client " + "at {}. Perhaps the client " + "is running an older version of Hadoop which does not support " + - "encryption", peer.getRemoteAddressString(), imne); + "encryption", peer, imne); } else { LOG.info("Failed to read expected SASL data transfer protection " + "handshake from client at {}" + ". Perhaps the client is running an older version of Hadoop " + "which does not support SASL data transfer protection", - peer.getRemoteAddressString(), imne); + peer, imne); } - return; + return null; } super.initialize(new DataInputStream(input)); @@ -274,7 +242,7 @@ public void run() { // Since we optimistically expect the next op, it's quite normal to // get EOF here. LOG.debug("Cached {} closing after {} ops. " + - "This message is usually benign.", peer, opsProcessed); + "This message is usually benign.", peer, opsProcessed, e); break; } catch (IOException err) { incrDatanodeNetworkErrors(); @@ -293,26 +261,20 @@ public void run() { (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) { String s = datanode.getDisplayName() + ":DataXceiver error processing " - + ((op == null) ? "unknown" : op.name()) + " operation " - + " src: " + remoteAddress + " dst: " + localAddress; + + ((op == null) ? "unknown" : op.name()) + " operation " + " peer: " + + this.peer; if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) { // For WRITE_BLOCK, it is okay if the replica already exists since // client and replication may write the same block to the same datanode // at the same time. - if (LOG.isTraceEnabled()) { - LOG.trace(s, t); - } else { - LOG.info("{}; {}", s, t.toString()); - } + LOG.info("{}; {}", s, t); + LOG.trace(s, t); } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) { String s1 = "Likely the client has stopped reading, disconnecting it"; s1 += " (" + s + ")"; - if (LOG.isTraceEnabled()) { - LOG.trace(s1, t); - } else { - LOG.info("{}; {}", s1, t.toString()); - } + LOG.info("{}; {}", s1, t); + LOG.trace(s1, t); } else if (t instanceof InvalidToken || t.getCause() instanceof InvalidToken) { // The InvalidToken exception has already been logged in @@ -330,7 +292,9 @@ public void run() { dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); } + Thread.currentThread().setName("Idle"); } + return null; } /** @@ -552,7 +516,7 @@ public void requestShortCircuitShm(String clientName) throws IOException { LOG.warn("Failed to shut down socket in error handler", e); } } - IOUtils.cleanup(null, shmInfo); + IOUtils.closeStream(shmInfo); } } @@ -582,12 +546,13 @@ public void readBlock(final ExtendedBlock block, DatanodeRegistration dnR = datanode.getDNRegistrationForBP(block.getBlockPoolId()); final String clientTraceFmt = - clientName.length() > 0 && ClientTraceLog.isInfoEnabled() - ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress, - "%d", "HDFS_READ", clientName, "%d", - dnR.getDatanodeUuid(), block, "%d") - : dnR + " Served block " + block + " to " + - remoteAddress; + (clientName.length() > 0 && ClientTraceLog.isInfoEnabled()) + ? String.format(DN_CLIENTTRACE_FORMAT, + this.peer.getLocalAddressString(), + this.peer.getRemoteAddressString(), "%d", "HDFS_READ", + clientName, "%d", dnR.getDatanodeUuid(), block, "%d") + : dnR + " Served block " + block + " to " + + this.peer.getRemoteAddressString(); try { try { @@ -614,9 +579,8 @@ public void readBlock(final ExtendedBlock block, ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( PBHelperClient.vintPrefixed(in)); if (!stat.hasStatus()) { - LOG.warn("Client {} did not send a valid status code " + - "after reading. Will close connection.", - peer.getRemoteAddressString()); + LOG.warn("Peer {} did not send a valid status code " + + "after reading. Will close connection.", peer); IOUtils.closeStream(out); } } catch (IOException ioe) { @@ -632,7 +596,7 @@ public void readBlock(final ExtendedBlock block, datanode.metrics.incrTotalReadTime(duration); } catch ( SocketException ignored ) { LOG.trace("{}:Ignoring exception while serving {} to {}", - dnR, block, remoteAddress, ignored); + dnR, block, this.peer, ignored); // Its ok for remote side to close the connection anytime. datanode.metrics.incrBlocksRead(); IOUtils.closeStream(out); @@ -642,7 +606,7 @@ public void readBlock(final ExtendedBlock block, */ if (!(ioe instanceof SocketTimeoutException)) { LOG.warn("{}:Got exception while serving {} to {}", - dnR, block, remoteAddress, ioe); + dnR, block, peer, ioe); incrDatanodeNetworkErrors(); } throw ioe; @@ -736,8 +700,8 @@ public void writeBlock(final ExtendedBlock block, if (block.getNumBytes() == 0) { block.setNumBytes(dataXceiverServer.estimateBlockSize); } - LOG.info("Receiving {} src: {} dest: {}", - block, remoteAddress, localAddress); + + LOG.info("Receiving {} from peer: {}", block, peer); DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target @@ -912,8 +876,8 @@ public void writeBlock(final ExtendedBlock block, if (isDatanode || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { datanode.closeBlock(block, null, storageUuid, isOnTransientStorage); - LOG.info("Received {} src: {} dest: {} of size {}", - block, remoteAddress, localAddress, block.getNumBytes()); + LOG.info("Received {} from {} of size {}", + block, peer, block.getNumBytes()); } if(isClient) { @@ -1103,7 +1067,7 @@ public void copyBlock(final ExtendedBlock block, datanode.metrics.incrBlocksRead(); datanode.metrics.incrTotalReadTime(duration); - LOG.info("Copied {} to {}", block, peer.getRemoteAddressString()); + LOG.info("Copied {} for peer {}", block, peer); } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock {} received exception {}", block, ioe.toString()); @@ -1225,8 +1189,8 @@ public void replaceBlock(final ExtendedBlock block, datanode.notifyNamenodeReceivedBlock( block, delHint, r.getStorageUuid(), r.isOnTransientStorage()); - LOG.info("Moved {} from {}, delHint={}", - block, peer.getRemoteAddressString(), delHint); + LOG.info("Moved {} from peer {}, delHint={}", + block, peer, delHint); } } catch (IOException ioe) { opStatus = ERROR; @@ -1256,8 +1220,7 @@ public void replaceBlock(final ExtendedBlock block, try { sendResponse(opStatus, errMsg); } catch (IOException ioe) { - LOG.warn("Error writing reply back to {}", - peer.getRemoteAddressString()); + LOG.warn("Error writing reply back to peer {}", peer); incrDatanodeNetworkErrors(); } IOUtils.closeStream(proxyOut); @@ -1347,7 +1310,7 @@ private void writeSuccessWithChecksumInfo(BlockSender blockSender, } private void incrDatanodeNetworkErrors() { - datanode.incrDatanodeNetworkErrors(remoteAddressWithoutPort); + datanode.incrDatanodeNetworkErrors(this.peer.getRemoteURI().getHost()); } /** @@ -1424,9 +1387,8 @@ private void checkAccess(OutputStream out, final boolean reply, resp.build().writeDelimitedTo(out); out.flush(); } - LOG.warn("Block token verification failed: op={}, " + - "remoteAddress={}, message={}", - op, remoteAddress, e.getLocalizedMessage()); + LOG.warn("Block token verification failed: op={}, " + + "peer={}, message={}", op, peer, e.getLocalizedMessage()); throw e; } finally { IOUtils.closeStream(out); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 46cb21e8d3a03..3f5462f1db960 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -21,7 +21,14 @@ import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -33,7 +40,6 @@ import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.PeerServer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.util.Daemon; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -56,7 +62,7 @@ class DataXceiverServer implements Runnable { private final PeerServer peerServer; private final DataNode datanode; - private final HashMap peers = new HashMap<>(); + private final HashMap> peers = new HashMap<>(); private final HashMap peersXceiver = new HashMap<>(); private final Lock lock = new ReentrantLock(); private final Condition noPeers = lock.newCondition(); @@ -176,6 +182,8 @@ void release() { */ final long estimateBlockSize; + private final ExecutorService xceiverExecutor; + DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { this.peerServer = peerServer; @@ -194,6 +202,21 @@ void release() { DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); + + final long xceiverTTL = + conf.getLong(DFSConfigKeys.DFS_DATANODE_RECEIVER_THREADS_TTL, + DFSConfigKeys.DFS_DATANODE_RECEIVER_THREADS_TTL_DEFAULT); + + this.xceiverExecutor = + new ThreadPoolExecutor(0, maxXceiverCount, xceiverTTL, TimeUnit.SECONDS, + new SynchronousQueue<>(true), new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = new Thread(datanode.threadGroup, r); + t.setDaemon(true); + return t; + } + }); } @Override @@ -203,17 +226,13 @@ public void run() { try { peer = peerServer.accept(); - // Make sure the xceiver count is not exceeded - int curXceiverCount = datanode.getXceiverCount(); - if (curXceiverCount > maxXceiverCount) { - throw new IOException("Xceiver count " + curXceiverCount - + " exceeds the limit of concurrent xcievers: " - + maxXceiverCount); - } + final DataXceiver xceiver = DataXceiver.create(peer, datanode, this); + Future f = this.xceiverExecutor.submit(xceiver); + addPeer(peer, f, xceiver); + + LOG.debug("Number of active connections is: {}. Thread pool: {}", + datanode.getXceiverCount(), this.xceiverExecutor); - new Daemon(datanode.threadGroup, - DataXceiver.create(peer, datanode, this)) - .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run } catch (AsynchronousCloseException ace) { @@ -222,7 +241,7 @@ public void run() { if (datanode.shouldRun && !datanode.shutdownForUpgrade) { LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ace); } - } catch (IOException ie) { + } catch (RejectedExecutionException | IOException ie) { IOUtils.closeQuietly(peer); LOG.warn("{}:DataXceiverServer", datanode.getDisplayName(), ie); } catch (OutOfMemoryError ie) { @@ -257,6 +276,10 @@ public void run() { lock.unlock(); } + // Initiate an orderly shutdown of the thread pool in which previously + // submitted tasks are executed, but no new tasks will be accepted. + this.xceiverExecutor.shutdown(); + // if in restart prep stage, notify peers before closing them. if (datanode.shutdownForUpgrade) { restartNotifyPeers(); @@ -288,14 +311,14 @@ void kill() { } } - void addPeer(Peer peer, Thread t, DataXceiver xceiver) + void addPeer(Peer peer, Future f, DataXceiver xceiver) throws IOException { lock.lock(); try { if (closed) { throw new IOException("Server closed."); } - peers.put(peer, t); + peers.put(peer, f); peersXceiver.put(peer, xceiver); datanode.metrics.incrDataNodeActiveXceiversCount(); } finally { @@ -342,7 +365,14 @@ public void sendOOBToPeers() { public void stopWriters() { lock.lock(); try { - peers.keySet().forEach(p -> peersXceiver.get(p).stopWriter()); + for (final Map.Entry> entry : peers.entrySet()) { + final Peer peer = entry.getKey(); + final DataXceiver xceiver = peersXceiver.get(peer); + if (xceiver.hasBlockReceiver()) { + LOG.info("Cancelling the writer for peer: {}", peer); + entry.getValue().cancel(true); + } + } } finally { lock.unlock(); } @@ -358,7 +388,7 @@ void restartNotifyPeers() { lock.lock(); try { // interrupt each and every DataXceiver thread. - peers.values().forEach(t -> t.interrupt()); + peers.values().forEach(f -> f.cancel(true)); } finally { lock.unlock(); } @@ -369,6 +399,13 @@ void restartNotifyPeers() { */ void closeAllPeers() { LOG.info("Closing all peers."); + Preconditions.checkState(this.closed && xceiverExecutor.isShutdown()); + try { + LOG.info("Waiting up to 60 seconds for current threads to complete"); + xceiverExecutor.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + // Ignore and just complete shutdown anyway + } lock.lock(); try { peers.keySet().forEach(p -> IOUtils.closeQuietly(p)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index 99d2fc8e04ea8..f3670c7404265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -23,7 +23,6 @@ import java.io.OutputStream; import java.io.RandomAccessFile; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,11 +49,12 @@ public class LocalReplicaInPipeline extends LocalReplica private final Lock lock = new ReentrantLock(); private final Condition bytesOnDiskChange = lock.newCondition(); + private final Condition writerChanged = lock.newCondition(); private long bytesAcked; private long bytesOnDisk; private byte[] lastChecksum; - private AtomicReference writer = new AtomicReference(); + private volatile Thread writer; /** * Bytes reserved for this replica on the containing volume. @@ -108,7 +108,7 @@ public LocalReplicaInPipeline(long blockId, long genStamp, super(blockId, len, genStamp, vol, dir); this.bytesAcked = len; this.bytesOnDisk = len; - this.writer.set(writer); + this.writer = writer; this.bytesReserved = bytesToReserve; this.originalBytesReserved = bytesToReserve; } @@ -121,7 +121,7 @@ public LocalReplicaInPipeline(LocalReplicaInPipeline from) { super(from); this.bytesAcked = from.getBytesAcked(); this.bytesOnDisk = from.getBytesOnDisk(); - this.writer.set(from.writer.get()); + this.writer = from.writer; this.bytesReserved = from.bytesReserved; this.originalBytesReserved = from.originalBytesReserved; } @@ -218,17 +218,28 @@ public void waitForMinLength(long minLength, long time, TimeUnit unit) } } - @Override // ReplicaInPipeline - public void setWriter(Thread writer) { - this.writer.set(writer); + @Override + public void setWriter(Thread newWriter) { + lock.lock(); + try { + if (this.writer != newWriter) { + this.writer = newWriter; + this.writerChanged.signalAll(); + } + } finally { + lock.unlock(); + } } @Override public void interruptThread() { - Thread thread = writer.get(); - if (thread != null && thread != Thread.currentThread() - && thread.isAlive()) { - thread.interrupt(); + lock.lock(); + try { + if (this.writer != null && this.writer != Thread.currentThread()) { + this.writer.interrupt(); + } + } finally { + lock.unlock(); } } @@ -240,40 +251,61 @@ public boolean equals(Object o) { /** * Attempt to set the writer to a new value. */ - @Override // ReplicaInPipeline + @Override public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) { - return writer.compareAndSet(prevWriter, newWriter); + final boolean change; + lock.lock(); + try { + change = (this.writer == prevWriter); + if (change) { + this.writer = newWriter; + this.writerChanged.signalAll(); + } + } finally { + lock.unlock(); + } + return change; } /** * Interrupt the writing thread and wait until it dies. * @throws IOException the waiting is interrupted */ - @Override // ReplicaInPipeline + @Override public void stopWriter(long xceiverStopTimeout) throws IOException { - while (true) { - Thread thread = writer.get(); - if ((thread == null) || (thread == Thread.currentThread()) || - (!thread.isAlive())) { - if (writer.compareAndSet(thread, null)) { - return; // Done - } - // The writer changed. Go back to the start of the loop and attempt to - // stop the new writer. - continue; + lock.lock(); + try { + final Thread thread = this.writer; + if (thread == null) { + // No writer + return; } + if (!thread.isAlive() || thread == Thread.currentThread()) { + this.writer = null; + this.writerChanged.signalAll(); + return; + } + + // Another thread owns this replica, stop it and wait for it to give up + // this replica thread.interrupt(); - try { - thread.join(xceiverStopTimeout); - if (thread.isAlive()) { - // Our thread join timed out. - final String msg = "Join on writer thread " + thread + " timed out"; + + long nanos = TimeUnit.MILLISECONDS.toNanos(xceiverStopTimeout); + + while (thread == this.writer) { + if (nanos <= 0L) { + final String msg = "Join on writer thread " + thread + " timed out. " + + "Writer thread may not have called attemptToSetWriter to " + + "clear ownership."; DataNode.LOG.warn(msg + "\n" + StringUtils.getStackTrace(thread)); throw new IOException(msg); } - } catch (InterruptedException e) { - throw new IOException("Waiting for writer thread is interrupted."); + nanos = writerChanged.awaitNanos(nanos); } + } catch (InterruptedException e) { + throw new IOException("Waiting for writer thread is interrupted."); + } finally { + lock.unlock(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 94d6512525d2f..c8fbd681107cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1416,6 +1416,15 @@ + + dfs.datanode.transfer.threads.ttl + 60 + + The maximum time (seconds) that excess idle transfer threads will wait + for new tasks before terminating. + + + dfs.datanode.scan.period.hours 504 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java index 9e3ddcfbc9ac4..95a194ffb7de2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java @@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RECEIVER_THREADS_TTL; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -61,7 +62,10 @@ public void setup() throws Exception { KEEPALIVE_TIMEOUT); conf.setInt(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 0); - + + // Threads are re-used, waiting up to 1 second for new work before exiting + conf.setLong(DFS_DATANODE_RECEIVER_THREADS_TTL, 1L); + cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(1).build(); dn = cluster.getDataNodes().get(0); @@ -94,8 +98,11 @@ public void testDatanodeRespectsKeepAliveTimeout() throws Exception { DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); - // Clients that write aren't currently re-used. assertEquals(0, peerCache.size()); + + // Clients that write are re-used if they are not idle for too long + // Thread TTL set to 1 second in test suit init + Thread.sleep(1500L); assertXceiverCount(0); // Reads the file, so we should get a @@ -105,10 +112,9 @@ public void testDatanodeRespectsKeepAliveTimeout() throws Exception { assertXceiverCount(1); // Sleep for a bit longer than the keepalive timeout - // and make sure the xceiver died. - Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 50); + Thread.sleep(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT + 50L + 1000L); assertXceiverCount(0); - + // The socket is still in the cache, because we don't // notice that it's closed until we try to read // from it again. @@ -139,8 +145,11 @@ public void testClientResponsesKeepAliveTimeout() throws Exception { DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); - // Clients that write aren't currently re-used. assertEquals(0, peerCache.size()); + + // Clients that write are currently re-used unless TTL time passes + // So sleep first + Thread.sleep(1500L); assertXceiverCount(0); // Reads the file, so we should get a @@ -209,6 +218,9 @@ public void testManyClosedSocketsInCache() throws Exception { // Make a small file Configuration clientConf = new Configuration(conf); clientConf.set(DFS_CLIENT_CONTEXT, "testManyClosedSocketsInCache"); + + // Increase TTL of items in cache to 30s so they endure this entire test + clientConf.setLong(DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY, 30000L); DistributedFileSystem fs = (DistributedFileSystem)FileSystem.get(cluster.getURI(), clientConf); @@ -227,13 +239,13 @@ public void testManyClosedSocketsInCache() throws Exception { IOUtils.copyBytes(stm, new IOUtils.NullOutputStream(), 1024); } } finally { - IOUtils.cleanup(null, stms); + IOUtils.closeStreams(stms); } - + assertEquals(5, peerCache.size()); - - // Let all the xceivers timeout - Thread.sleep(1500); + + // Let all the xceivers timeout (1s timeout) + Thread.sleep(3000); assertXceiverCount(0); // Client side still has the sockets cached diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index f12285cb9b7ba..143c71626fe4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -51,6 +51,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -59,6 +60,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.collect.Iterators; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -715,39 +717,52 @@ public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception { Configuration conf = new HdfsConfiguration(); conf.set(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, "1000"); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitClusterUp(); - DistributedFileSystem fs = cluster.getFileSystem(); - Path path = new Path("/test"); - FSDataOutputStream out = fs.create(path); + + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path path = new Path("/test"); + final FSDataOutputStream out = fs.create(path); out.writeBytes("data"); out.hsync(); - - List blocks = DFSTestUtil.getAllBlocks(fs.open(path)); + + final List blocks = DFSTestUtil.getAllBlocks(fs.open(path)); final LocatedBlock block = blocks.get(0); final DataNode dataNode = cluster.getDataNodes().get(0); - + final CountDownLatch syncPoint = new CountDownLatch(2); + final AtomicBoolean recoveryInitResult = new AtomicBoolean(true); + Thread recoveryThread = new Thread() { @Override public void run() { try { DatanodeInfo[] locations = block.getLocations(); - final RecoveringBlock recoveringBlock = new RecoveringBlock( - block.getBlock(), locations, block.getBlock() - .getGenerationStamp() + 1); - try(AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { + final RecoveringBlock recoveringBlock = + new RecoveringBlock(block.getBlock(), locations, + block.getBlock().getGenerationStamp() + 1); + try (AutoCloseableLock lock = dataNode.data.acquireDatasetLock()) { + // Wait for XCeiver thread to timeout (1s timeout) Thread.sleep(2000); dataNode.initReplicaRecovery(recoveringBlock); + + // Replica recover has began, allow main thread to close the file + syncPoint.countDown(); } } catch (Exception e) { recoveryInitResult.set(false); } } }; + recoveryThread.start(); + + // Wait for recovery thread to start recovery on this thread's open file + syncPoint.countDown(); + try { out.close(); } catch (IOException e) { @@ -756,16 +771,16 @@ public void run() { } finally { recoveryThread.join(); } + Assert.assertTrue("Recovery should be initiated successfully", recoveryInitResult.get()); - - dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock() - .getGenerationStamp() + 1, block.getBlock().getBlockId(), - block.getBlockSize()); + + dataNode.updateReplicaUnderRecovery(block.getBlock(), + block.getBlock().getGenerationStamp() + 1, + block.getBlock().getBlockId(), block.getBlockSize()); } finally { - if (null != cluster) { - cluster.shutdown(); - cluster = null; + if (cluster != null) { + cluster.close(); } } } @@ -961,8 +976,6 @@ private void testStopWorker(final TestStopWorkerRunnable tswr) Assert.assertEquals( TEST_STOP_WORKER_XCEIVER_STOP_TIMEOUT_MILLIS, dn.getDnConf().getXceiverStopTimeout()); - final TestStopWorkerSemaphore progressParent = - new TestStopWorkerSemaphore(); final TestStopWorkerSemaphore terminateSlowWriter = new TestStopWorkerSemaphore(); final AtomicReference failure = @@ -972,6 +985,7 @@ private void testStopWorker(final TestStopWorkerRunnable tswr) final RecoveringBlock recoveringBlock = Iterators.get(recoveringBlocks.iterator(), 0); final ExtendedBlock block = recoveringBlock.getBlock(); + final CountDownLatch syncPoint = new CountDownLatch(2); Thread slowWriterThread = new Thread(new Runnable() { @Override public void run() { @@ -982,9 +996,12 @@ public void run() { spyDN.data.createRbw(StorageType.DISK, null, block, false); replicaHandler.close(); LOG.debug("slowWriter created rbw"); - // Tell the parent thread to start progressing. - progressParent.sem.release(); + syncPoint.countDown(); terminateSlowWriter.uninterruptiblyAcquire(60000); + + // Give up ownership of the replica + replicaHandler.getReplica().attemptToSetWriter(Thread.currentThread(), + null); LOG.debug("slowWriter exiting"); } catch (Throwable t) { LOG.error("slowWriter got exception", t); @@ -996,7 +1013,10 @@ public void run() { // Start the slow worker thread and wait for it to take ownership of the // ReplicaInPipeline slowWriterThread.start(); - progressParent.uninterruptiblyAcquire(60000); + + // Will only free the main test thread if the slowWriter has ownership of + // the ReplicaInPipeline and test thread has reached this point + syncPoint.countDown(); // Start a worker thread which will attempt to stop the writer. Thread stopWriterThread = new Thread(new Runnable() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index f34ac51ce6c50..9800e0ac45c46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -69,6 +69,7 @@ import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -481,7 +482,8 @@ public void testAddVolumesConcurrently() dn.data = Mockito.spy(data); final int newVolumeCount = 40; - List addVolumeDelayedThreads = new ArrayList<>(); + List addVolumeDelayedThreads = + Collections.synchronizedList(new ArrayList<>()); AtomicBoolean addVolumeError = new AtomicBoolean(false); AtomicBoolean listStorageError = new AtomicBoolean(false); CountDownLatch addVolumeCompletionLatch = @@ -492,16 +494,18 @@ public void testAddVolumesConcurrently() final Thread listStorageThread = new Thread(new Runnable() { @Override public void run() { - while (addVolumeCompletionLatch.getCount() != newVolumeCount) { - int i = 0; - while(i++ < 1000) { - try { - dn.getStorage().listStorageDirectories(); - } catch (Exception e) { - listStorageError.set(true); - LOG.error("Error listing storage: " + e); + try { + while (!addVolumeCompletionLatch.await(0, TimeUnit.MILLISECONDS)) { + for (int i = 0; i < 100; i++) { + try { + dn.getStorage().listStorageDirectories(); + } catch (Exception e) { + listStorageError.set(true); + LOG.error("Error listing storage", e); + } } } + } catch (InterruptedException e) { } } }); @@ -511,23 +515,21 @@ public void run() { doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - final Random r = new Random(); Thread addVolThread = new Thread(new Runnable() { @Override public void run() { try { - r.setSeed(Time.now()); // Let 50% of add volume operations // start after an initial delay. - if (r.nextInt(10) > 4) { - int s = r.nextInt(10) + 1; - Thread.sleep(s * 100); + if (ThreadLocalRandom.current().nextFloat() >= 0.5f) { + Thread.sleep( + ThreadLocalRandom.current().nextLong(100L, 1000L)); } invocationOnMock.callRealMethod(); } catch (Throwable throwable) { addVolumeError.set(true); - LOG.error("Error adding volume: " + throwable); + LOG.error("Error adding volume", throwable); } finally { addVolumeCompletionLatch.countDown(); } @@ -542,6 +544,9 @@ public void run() { addVolumes(newVolumeCount, addVolumeCompletionLatch); numVolumes += newVolumeCount; + // Wait for all the volumes to be added + addVolumeCompletionLatch.await(180, TimeUnit.SECONDS); + // Wait for all addVolume and listStorage Threads to complete for (Thread t : addVolumeDelayedThreads) { t.join(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java index 4343b0acd038f..f51dd0eaf0e50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java @@ -207,6 +207,10 @@ public void testXceiverCountInternal(int minMaintenanceR) throws Exception { HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY, minMaintenanceR); + + // Turn off thread re-use so that we get consistent ouput for this test + conf.setLong(DFSConfigKeys.DFS_DATANODE_RECEIVER_THREADS_TTL, 0L); + MiniDFSCluster cluster = null; final int nodes = 8;