Skip to content

Commit b5e7f89

Browse files
hchaverrisnmvaughanSteve Vaughan Jr
authored
HADOOP-18365. Update the remote address when a change is detected (apache#4… (apache#356)
* HADOOP-18365. Update the remote address when a change is detected (apache#4692) (apache#4768) Back port to branch-3.3, to avoid reconnecting to the old address after detecting that the address has been updated. * Use a stable hashCode to allow safe IP addr changes * Add test that updated address is used Once the address has been updated, it will be used in future calls. Test verifies that a second request succeeds and that it uses the existing updated address instead of having to re-resolve. Co-authored-by: Steve Vaughan Jr <[email protected]> * ACLOVERRIDE --------- Co-authored-by: Steve Vaughan <[email protected]> Co-authored-by: Steve Vaughan Jr <[email protected]>
1 parent 0a4a63b commit b5e7f89

File tree

2 files changed

+110
-6
lines changed
  • hadoop-common-project/hadoop-common/src

2 files changed

+110
-6
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ public synchronized Writable getRpcResponse() {
341341
* socket: responses may be delivered out of order. */
342342
private class Connection extends Thread {
343343
private InetSocketAddress server; // server ip:port
344-
private final ConnectionId remoteId; // connection id
344+
private final ConnectionId remoteId; // connection id
345345
private AuthMethod authMethod; // authentication method
346346
private AuthProtocol authProtocol;
347347
private int serviceClass;
@@ -573,6 +573,9 @@ private synchronized boolean updateAddress() throws IOException {
573573
LOG.warn("Address change detected. Old: " + server.toString() +
574574
" New: " + currentAddr.toString());
575575
server = currentAddr;
576+
// Update the remote address so that reconnections are with the updated address.
577+
// This avoids thrashing.
578+
remoteId.setAddress(currentAddr);
576579
UserGroupInformation ticket = remoteId.getTicket();
577580
this.setName("IPC Client (" + socketFactory.hashCode()
578581
+ ") connection to " + server.toString() + " from "
@@ -1624,9 +1627,9 @@ private Connection getConnection(ConnectionId remoteId,
16241627
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
16251628
@InterfaceStability.Evolving
16261629
public static class ConnectionId {
1627-
InetSocketAddress address;
1628-
UserGroupInformation ticket;
1629-
final Class<?> protocol;
1630+
private InetSocketAddress address;
1631+
private final UserGroupInformation ticket;
1632+
private final Class<?> protocol;
16301633
private static final int PRIME = 16777619;
16311634
private final int rpcTimeout;
16321635
private final int maxIdleTime; //connections will be culled if it was idle for
@@ -1677,7 +1680,28 @@ public ConnectionId(InetSocketAddress address, Class<?> protocol,
16771680
InetSocketAddress getAddress() {
16781681
return address;
16791682
}
1680-
1683+
1684+
/**
1685+
* This is used to update the remote address when an address change is detected. This method
1686+
* ensures that the {@link #hashCode()} won't change.
1687+
*
1688+
* @param address the updated address
1689+
* @throws IllegalArgumentException if the hostname or port doesn't match
1690+
* @see Connection#updateAddress()
1691+
*/
1692+
void setAddress(InetSocketAddress address) {
1693+
if (!Objects.equals(this.address.getHostName(), address.getHostName())) {
1694+
throw new IllegalArgumentException("Hostname must match: " + this.address + " vs "
1695+
+ address);
1696+
}
1697+
if (this.address.getPort() != address.getPort()) {
1698+
throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address);
1699+
}
1700+
1701+
this.address = address;
1702+
}
1703+
1704+
16811705
Class<?> getProtocol() {
16821706
return protocol;
16831707
}
@@ -1788,7 +1812,11 @@ && isEqual(this.protocol, that.protocol)
17881812
@Override
17891813
public int hashCode() {
17901814
int result = connectionRetryPolicy.hashCode();
1791-
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
1815+
// We calculate based on the host name and port without the IP address, since the hashCode
1816+
// must be stable even if the IP address is updated.
1817+
result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 :
1818+
address.getHostName().hashCode());
1819+
result = PRIME * result + ((address == null) ? 0 : address.getPort());
17921820
result = PRIME * result + (doPing ? 1231 : 1237);
17931821
result = PRIME * result + maxIdleTime;
17941822
result = PRIME * result + pingInterval;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNotNull;
@@ -815,6 +816,81 @@ public Void call() throws IOException {
815816
}
816817
}
817818

819+
/**
820+
* The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the
821+
* address evolves over time. The {@link ConnectionId} is used as a primary key in maps, so
822+
* its hashCode can't change.
823+
*
824+
* @throws IOException if there is a client or server failure
825+
*/
826+
@Test
827+
public void testStableHashCode() throws IOException {
828+
Server server = new TestServer(5, false);
829+
try {
830+
server.start();
831+
832+
// Leave host unresolved to start. Use "localhost" as opposed
833+
// to local IP from NetUtils.getConnectAddress(server) to force
834+
// resolution later
835+
InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
836+
"localhost", NetUtils.getConnectAddress(server).getPort());
837+
838+
// Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve
839+
// as a point of comparison.
840+
int rpcTimeout = MIN_SLEEP_TIME * 2;
841+
final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf);
842+
int expected = remoteId.hashCode();
843+
844+
// Start client
845+
Client.setConnectTimeout(conf, 100);
846+
Client client = new Client(LongWritable.class, conf);
847+
try {
848+
// Test: Call should re-resolve host and succeed
849+
LongWritable param = new LongWritable(RANDOM.nextLong());
850+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
851+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
852+
int actual = remoteId.hashCode();
853+
854+
// Verify: The hashCode should match, although the InetAddress is different since it has
855+
// now been resolved
856+
assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr);
857+
assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName());
858+
assertThat(remoteId.hashCode()).isEqualTo(expected);
859+
860+
// Test: Call should succeed without having to re-resolve
861+
InetSocketAddress expectedSocketAddress = remoteId.getAddress();
862+
param = new LongWritable(RANDOM.nextLong());
863+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
864+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
865+
866+
// Verify: The same instance of the InetSocketAddress has been used to make the second
867+
// call
868+
assertThat(remoteId.getAddress()).isSameAs(expectedSocketAddress);
869+
870+
// Verify: The hashCode is protected against updates to the host name
871+
String hostName = InetAddress.getLocalHost().getHostName();
872+
InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(
873+
InetAddress.getLocalHost().getHostName(),
874+
remoteId.getAddress().getPort());
875+
assertThatExceptionOfType(IllegalArgumentException.class)
876+
.isThrownBy(() -> remoteId.setAddress(mismatchedHostName))
877+
.withMessageStartingWith("Hostname must match");
878+
879+
// Verify: The hashCode is protected against updates to the port
880+
InetSocketAddress mismatchedPort = NetUtils.createSocketAddr(
881+
remoteId.getAddress().getHostName(),
882+
remoteId.getAddress().getPort() + 1);
883+
assertThatExceptionOfType(IllegalArgumentException.class)
884+
.isThrownBy(() -> remoteId.setAddress(mismatchedPort))
885+
.withMessageStartingWith("Port must match");
886+
} finally {
887+
client.stop();
888+
}
889+
} finally {
890+
server.stop();
891+
}
892+
}
893+
818894
@Test(timeout=60000)
819895
public void testIpcFlakyHostResolution() throws IOException {
820896
// start server

0 commit comments

Comments
 (0)