diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 2044530506757..ea257f2f5311f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -69,6 +69,8 @@ public interface HdfsClientConfigKeys { String DFS_NAMESERVICES = "dfs.nameservices"; String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; + String DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY = "dfs.namenode.msync.rpc-address"; + String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports"; String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java index 2c58ad1a97b10..13fbf6c410f38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY; + import java.io.Closeable; import java.io.IOException; import java.io.InterruptedIOException; @@ -195,7 +197,7 @@ public class ObserverReadProxyProvider public ObserverReadProxyProvider( Configuration conf, URI uri, Class xface, HAProxyFactory factory) { this(conf, uri, xface, factory, - new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory)); + new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory, DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY)); } @SuppressWarnings("unchecked") diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f92a2ad56581b..00a16af788f26 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -219,6 +219,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.namenode.lifeline.rpc-address"; public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY = "dfs.namenode.lifeline.rpc-bind-host"; + public static final String DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY = + HdfsClientConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY; + public static final String DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY = + "dfs.namenode.msync.rpc-bind-host"; + public static final String DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY = + "dfs.namenode.msync.handler.ratio"; + public static final float DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT = + 0.1f; + public static final String DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY = + "dfs.namenode.msync.handler.count"; public static final String DFS_NAMENODE_MAX_OBJECTS_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index a48cfdbe5957c..f0aed5778329c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -176,6 +176,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY; @@ -324,6 +326,8 @@ public enum OperationCategory { DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY, DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY, + DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY, + DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY, DFS_NAMENODE_HTTP_ADDRESS_KEY, @@ -702,6 +706,21 @@ InetSocketAddress getLifelineRpcServerAddress(Configuration conf) { return NetUtils.createSocketAddr(addr); } + /** + * Given a configuration get the address of the msync RPC server. + * If the msync RPC is not configured returns null. + * + * @param conf configuration + * @return address or null + */ + InetSocketAddress getMsyncRpcServerAddress(Configuration conf) { + String addr = getTrimmedOrNull(conf, DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY); + if (addr == null) { + return null; + } + return NetUtils.createSocketAddr(addr); + } + /** * Given a configuration get the address of the service rpc server * If the service rpc is not configured returns null @@ -725,6 +744,17 @@ String getLifelineRpcServerBindHost(Configuration conf) { return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY); } + /** + * Given a configuration get the bind host of the msync RPC server. + * If the bind host is not configured returns null. + * + * @param conf configuration + * @return bind host or null + */ + String getMsyncRpcServerBindHost(Configuration conf) { + return getTrimmedOrNull(conf, DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY); + } + /** Given a configuration get the bind host of the service rpc server * If the bind host is not configured returns null. */ @@ -767,6 +797,19 @@ void setRpcLifelineServerAddress(Configuration conf, NetUtils.getHostPortString(lifelineRPCAddress)); } + /** + * Modifies the configuration to contain the lifeline RPC address setting. + * + * @param conf configuration to modify + * @param msyncRPCAddress lifeline RPC address + */ + void setRpcMsyncServerAddress(Configuration conf, + InetSocketAddress msyncRPCAddress) { + LOG.info("Setting msync RPC address {}", msyncRPCAddress); + conf.set(DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY, + NetUtils.getHostPortString(msyncRPCAddress)); + } + /** * Modifies the configuration passed to contain the service rpc address setting */ @@ -1035,6 +1078,9 @@ private void startCommonServices(Configuration conf) throws IOException { if (rpcServer.getLifelineRpcAddress() != null) { LOG.info("{} lifeline RPC up at: {}.", getRole(), rpcServer.getLifelineRpcAddress()); } + if (rpcServer.getMsyncRpcAddress() != null) { + LOG.info("{} msync RPC up at: {}.", getRole(), rpcServer.getMsyncRpcAddress()); + } } private void stopCommonServices() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index ec3eb4f038ec4..d30201c05d12a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -25,6 +25,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY; @@ -262,6 +265,10 @@ public class NameNodeRpcServer implements NamenodeProtocols { /** The RPC server that listens to lifeline requests */ private final RPC.Server lifelineRpcServer; private final InetSocketAddress lifelineRPCAddress; + + /** The RPC server that listens to msync requests */ + private final RPC.Server msyncRpcServer; + private final InetSocketAddress msyncRPCAddress; /** The RPC server that listens to requests from clients */ protected final RPC.Server clientRpcServer; @@ -444,6 +451,43 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) lifelineRPCAddress = null; } + InetSocketAddress msyncRpcAddr = nn.getMsyncRpcServerAddress(conf); + if (msyncRpcAddr != null) { + String bindHost = nn.getMsyncRpcServerBindHost(conf); + if (bindHost == null) { + bindHost = msyncRpcAddr.getHostName(); + } + LOG.info("Msync RPC server is binding to {}:{}", bindHost, + msyncRpcAddr.getPort()); + int msyncHandlerCount = conf.getInt( + DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY, 0); + if (msyncHandlerCount <= 0) { + float msyncHandlerRatio = conf.getFloat( + DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY, + DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT); + msyncHandlerCount = Math.max( + (int)(handlerCount * msyncHandlerRatio), 1); + } + msyncRpcServer = new RPC.Builder(conf) + .setProtocol(ClientNamenodeProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress(bindHost) + .setPort(msyncRpcAddr.getPort()) + .setNumHandlers(msyncHandlerCount) + .setVerbose(false) + .setSecretManager(namesystem.getDelegationTokenSecretManager()) + .build(); + + // Update the address with the correct port + InetSocketAddress listenAddr = msyncRpcServer.getListenerAddress(); + msyncRPCAddress = new InetSocketAddress(msyncRpcAddr.getHostName(), + listenAddr.getPort()); + nn.setRpcMsyncServerAddress(conf, msyncRPCAddress); + } else { + msyncRpcServer = null; + msyncRPCAddress = null; + } + InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { @@ -462,8 +506,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn) } clientRpcServer = new RPC.Builder(conf) - .setProtocol( - org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) + .setProtocol(ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(rpcAddr.getPort()) @@ -591,6 +634,9 @@ void start() { if (lifelineRpcServer != null) { lifelineRpcServer.start(); } + if (msyncRpcServer != null) { + msyncRpcServer.start(); + } } /** @@ -604,6 +650,9 @@ void join() throws InterruptedException { if (lifelineRpcServer != null) { lifelineRpcServer.join(); } + if (msyncRpcServer != null) { + msyncRpcServer.join(); + } } /** @@ -619,12 +668,19 @@ void stop() { if (lifelineRpcServer != null) { lifelineRpcServer.stop(); } + if (msyncRpcServer != null ) { + msyncRpcServer.stop(); + } } InetSocketAddress getLifelineRpcAddress() { return lifelineRPCAddress; } + InetSocketAddress getMsyncRpcAddress() { + return msyncRPCAddress; + } + InetSocketAddress getServiceRpcAddress() { return serviceRPCAddress; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index e6dc8c5ba1ac4..d386e22388ebb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -105,6 +105,33 @@ + + dfs.namenode.msync.rpc-address + + + NameNode RPC msync address. This is an optional separate RPC address + that can be used to msync requests to protect against + resource exhaustion in the main RPC handler pool. In the case of + HA/Federation where multiple NameNodes exist, the name service ID is added + to the name e.g. dfs.namenode.msync.rpc-address.ns1. The value of this + property will take the form of nn-host1:rpc-port. If this property is not + defined, then the NameNode will not start a msync RPC server. By + default, the property is not defined. + + + + + dfs.namenode.msync.rpc-bind-host + + + The actual address the msync RPC server will bind to. If this optional + address is set, it overrides only the hostname portion of + dfs.namenode.msync.rpc-address. It can also be specified per name node + or name service for HA/Federation. This is useful for making the name node + listen on all interfaces by setting it to 0.0.0.0. + + + dfs.namenode.secondary.http-address 0.0.0.0:9868