Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public interface HdfsClientConfigKeys {
String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";

String DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY = "dfs.namenode.msync.rpc-address";

String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+ "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY;

import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -195,7 +197,7 @@ public class ObserverReadProxyProvider<T>
public ObserverReadProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, factory,
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory));
new ConfiguredFailoverProxyProvider<>(conf, uri, xface, factory, DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.lifeline.rpc-address";
public static final String DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY =
"dfs.namenode.lifeline.rpc-bind-host";
public static final String DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY =
HdfsClientConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY;
public static final String DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY =
"dfs.namenode.msync.rpc-bind-host";
public static final String DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY =
"dfs.namenode.msync.handler.ratio";
public static final float DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT =
0.1f;
public static final String DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY =
"dfs.namenode.msync.handler.count";
public static final String DFS_NAMENODE_MAX_OBJECTS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
public static final long DFS_NAMENODE_MAX_OBJECTS_DEFAULT = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY;
Expand Down Expand Up @@ -324,6 +326,8 @@ public enum OperationCategory {
DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY,
DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY,
DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY,
DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY,
DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY,
DFS_NAMENODE_SERVICE_RPC_BIND_HOST_KEY,
DFS_NAMENODE_HTTP_ADDRESS_KEY,
Expand Down Expand Up @@ -702,6 +706,21 @@ InetSocketAddress getLifelineRpcServerAddress(Configuration conf) {
return NetUtils.createSocketAddr(addr);
}

/**
* Given a configuration get the address of the msync RPC server.
* If the msync RPC is not configured returns null.
*
* @param conf configuration
* @return address or null
*/
InetSocketAddress getMsyncRpcServerAddress(Configuration conf) {
String addr = getTrimmedOrNull(conf, DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY);
if (addr == null) {
return null;
}
return NetUtils.createSocketAddr(addr);
}

/**
* Given a configuration get the address of the service rpc server
* If the service rpc is not configured returns null
Expand All @@ -725,6 +744,17 @@ String getLifelineRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_LIFELINE_RPC_BIND_HOST_KEY);
}

/**
* Given a configuration get the bind host of the msync RPC server.
* If the bind host is not configured returns null.
*
* @param conf configuration
* @return bind host or null
*/
String getMsyncRpcServerBindHost(Configuration conf) {
return getTrimmedOrNull(conf, DFS_NAMENODE_MSYNC_RPC_BIND_HOST_KEY);
}

/** Given a configuration get the bind host of the service rpc server
* If the bind host is not configured returns null.
*/
Expand Down Expand Up @@ -767,6 +797,19 @@ void setRpcLifelineServerAddress(Configuration conf,
NetUtils.getHostPortString(lifelineRPCAddress));
}

/**
* Modifies the configuration to contain the lifeline RPC address setting.
*
* @param conf configuration to modify
* @param msyncRPCAddress lifeline RPC address
*/
void setRpcMsyncServerAddress(Configuration conf,
InetSocketAddress msyncRPCAddress) {
LOG.info("Setting msync RPC address {}", msyncRPCAddress);
conf.set(DFS_NAMENODE_MSYNC_RPC_ADDRESS_KEY,
NetUtils.getHostPortString(msyncRPCAddress));
}

/**
* Modifies the configuration passed to contain the service rpc address setting
*/
Expand Down Expand Up @@ -1035,6 +1078,9 @@ private void startCommonServices(Configuration conf) throws IOException {
if (rpcServer.getLifelineRpcAddress() != null) {
LOG.info("{} lifeline RPC up at: {}.", getRole(), rpcServer.getLifelineRpcAddress());
}
if (rpcServer.getMsyncRpcAddress() != null) {
LOG.info("{} msync RPC up at: {}.", getRole(), rpcServer.getMsyncRpcAddress());
}
}

private void stopCommonServices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
Expand Down Expand Up @@ -262,6 +265,10 @@ public class NameNodeRpcServer implements NamenodeProtocols {
/** The RPC server that listens to lifeline requests */
private final RPC.Server lifelineRpcServer;
private final InetSocketAddress lifelineRPCAddress;

/** The RPC server that listens to msync requests */
private final RPC.Server msyncRpcServer;
private final InetSocketAddress msyncRPCAddress;

/** The RPC server that listens to requests from clients */
protected final RPC.Server clientRpcServer;
Expand Down Expand Up @@ -444,6 +451,43 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
lifelineRPCAddress = null;
}

InetSocketAddress msyncRpcAddr = nn.getMsyncRpcServerAddress(conf);
if (msyncRpcAddr != null) {
String bindHost = nn.getMsyncRpcServerBindHost(conf);
if (bindHost == null) {
bindHost = msyncRpcAddr.getHostName();
}
LOG.info("Msync RPC server is binding to {}:{}", bindHost,
msyncRpcAddr.getPort());
int msyncHandlerCount = conf.getInt(
DFS_NAMENODE_MSYNC_HANDLER_COUNT_KEY, 0);
if (msyncHandlerCount <= 0) {
float msyncHandlerRatio = conf.getFloat(
DFS_NAMENODE_MSYNC_HANDLER_RATIO_KEY,
DFS_NAMENODE_MSYNC_HANDLER_RATIO_DEFAULT);
msyncHandlerCount = Math.max(
(int)(handlerCount * msyncHandlerRatio), 1);
}
msyncRpcServer = new RPC.Builder(conf)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(msyncRpcAddr.getPort())
.setNumHandlers(msyncHandlerCount)
.setVerbose(false)
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();

// Update the address with the correct port
InetSocketAddress listenAddr = msyncRpcServer.getListenerAddress();
msyncRPCAddress = new InetSocketAddress(msyncRpcAddr.getHostName(),
listenAddr.getPort());
nn.setRpcMsyncServerAddress(conf, msyncRPCAddress);
} else {
msyncRpcServer = null;
msyncRPCAddress = null;
}

InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf);
String bindHost = nn.getRpcServerBindHost(conf);
if (bindHost == null) {
Expand All @@ -462,8 +506,7 @@ public NameNodeRpcServer(Configuration conf, NameNode nn)
}

clientRpcServer = new RPC.Builder(conf)
.setProtocol(
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress(bindHost)
.setPort(rpcAddr.getPort())
Expand Down Expand Up @@ -591,6 +634,9 @@ void start() {
if (lifelineRpcServer != null) {
lifelineRpcServer.start();
}
if (msyncRpcServer != null) {
msyncRpcServer.start();
}
}

/**
Expand All @@ -604,6 +650,9 @@ void join() throws InterruptedException {
if (lifelineRpcServer != null) {
lifelineRpcServer.join();
}
if (msyncRpcServer != null) {
msyncRpcServer.join();
}
}

/**
Expand All @@ -619,12 +668,19 @@ void stop() {
if (lifelineRpcServer != null) {
lifelineRpcServer.stop();
}
if (msyncRpcServer != null ) {
msyncRpcServer.stop();
}
}

InetSocketAddress getLifelineRpcAddress() {
return lifelineRPCAddress;
}

InetSocketAddress getMsyncRpcAddress() {
return msyncRPCAddress;
}

InetSocketAddress getServiceRpcAddress() {
return serviceRPCAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,33 @@
</description>
</property>

<property>
<name>dfs.namenode.msync.rpc-address</name>
<value></value>
<description>
NameNode RPC msync address. This is an optional separate RPC address
that can be used to msync requests to protect against
resource exhaustion in the main RPC handler pool. In the case of
HA/Federation where multiple NameNodes exist, the name service ID is added
to the name e.g. dfs.namenode.msync.rpc-address.ns1. The value of this
property will take the form of nn-host1:rpc-port. If this property is not
defined, then the NameNode will not start a msync RPC server. By
default, the property is not defined.
</description>
</property>

<property>
<name>dfs.namenode.msync.rpc-bind-host</name>
<value></value>
<description>
The actual address the msync RPC server will bind to. If this optional
address is set, it overrides only the hostname portion of
dfs.namenode.msync.rpc-address. It can also be specified per name node
or name service for HA/Federation. This is useful for making the name node
listen on all interfaces by setting it to 0.0.0.0.
</description>
</property>

<property>
<name>dfs.namenode.secondary.http-address</name>
<value>0.0.0.0:9868</value>
Expand Down