Skip to content

Commit 31b7ae8

Browse files
committed
Only connect to formed remote clusters
This change prevent remote cluster connections to be established to nodes that have not yet joined a cluster and don't have a cluster UUID. This allows to effectivly detect nodes that are part of the local cluster. To compare the local cluster UUID to the remote nodes cluster UUID we need to wait until we recovered a state and a master is elected before we can connect to remote clusters. Relates to elastic#31331
1 parent e67aa96 commit 31b7ae8

File tree

8 files changed

+261
-108
lines changed

8 files changed

+261
-108
lines changed

server/src/main/java/org/elasticsearch/action/main/TransportMainAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public TransportMainAction(Settings settings, ThreadPool threadPool, TransportSe
5050
protected void doExecute(MainRequest request, ActionListener<MainResponse> listener) {
5151
ClusterState clusterState = clusterService.state();
5252
assert Node.NODE_NAME_SETTING.exists(settings);
53-
final boolean available = clusterState.getBlocks().hasGlobalBlock(RestStatus.SERVICE_UNAVAILABLE) == false;
5453
listener.onResponse(
5554
new MainResponse(Node.NODE_NAME_SETTING.get(settings), Version.CURRENT, clusterState.getClusterName(),
5655
clusterState.metaData().clusterUUID(), Build.CURRENT));

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
package org.elasticsearch.node;
2121

2222
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.ThreadContext;
2324
import org.apache.lucene.util.Constants;
2425
import org.apache.lucene.util.SetOnce;
2526
import org.elasticsearch.Build;
2627
import org.elasticsearch.ElasticsearchException;
2728
import org.elasticsearch.ElasticsearchTimeoutException;
2829
import org.elasticsearch.Version;
30+
import org.elasticsearch.action.ActionListener;
2931
import org.elasticsearch.action.ActionModule;
3032
import org.elasticsearch.action.Action;
3133
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
@@ -139,6 +141,7 @@
139141
import org.elasticsearch.tasks.TaskResultsService;
140142
import org.elasticsearch.threadpool.ExecutorBuilder;
141143
import org.elasticsearch.threadpool.ThreadPool;
144+
import org.elasticsearch.transport.RemoteClusterService;
142145
import org.elasticsearch.transport.Transport;
143146
import org.elasticsearch.transport.TransportInterceptor;
144147
import org.elasticsearch.transport.TransportService;
@@ -165,8 +168,10 @@
165168
import java.util.Set;
166169
import java.util.concurrent.CountDownLatch;
167170
import java.util.concurrent.TimeUnit;
171+
import java.util.concurrent.atomic.AtomicBoolean;
168172
import java.util.function.Consumer;
169173
import java.util.function.Function;
174+
import java.util.function.Predicate;
170175
import java.util.function.UnaryOperator;
171176
import java.util.stream.Collectors;
172177
import java.util.stream.Stream;
@@ -678,17 +683,37 @@ public Node start() throws NodeValidationException {
678683
: "clusterService has a different local node than the factory provided";
679684
transportService.acceptIncomingRequests();
680685
discovery.startInitialJoin();
686+
final ThreadPool thread = injector.getInstance(ThreadPool.class);
681687
final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
682-
if (initialStateTimeout.millis() > 0) {
683-
final ThreadPool thread = injector.getInstance(ThreadPool.class);
684-
ClusterState clusterState = clusterService.state();
685-
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
686-
if (clusterState.nodes().getMasterNodeId() == null) {
688+
final boolean connectToRemote = RemoteClusterService.ENABLE_REMOTE_CLUSTERS.get(settings);
689+
final boolean waitForState = initialStateTimeout.millis() > 0;
690+
Predicate<ClusterState> connectRemoteClusterPredicate = state -> "_na_".equals(state.metaData().clusterUUID()) == false;
691+
ClusterState clusterState = clusterService.state();
692+
AtomicBoolean connectRemoteClusters = new AtomicBoolean(connectToRemote);
693+
if (waitForState) {
694+
CountDownLatch latch = new CountDownLatch(1);
695+
Predicate<ClusterState> clusterStatePredicate = state -> state.nodes().getMasterNodeId() != null;
696+
final Consumer<ClusterState> consumer;
697+
if (connectToRemote) {
698+
clusterStatePredicate = clusterStatePredicate.and(connectRemoteClusterPredicate);
699+
connectRemoteClusters.set(false);
700+
consumer = c -> transportService.getRemoteClusterService().initializeRemoteClusters(c.metaData().clusterUUID(),
701+
c.metaData().settings(), ActionListener.wrap(v -> latch.countDown(), e -> {
702+
latch.countDown();
703+
logger.warn("Failed to connect to remote clusters", e);
704+
}));
705+
} else {
706+
consumer = c -> latch.countDown();
707+
}
708+
if (clusterStatePredicate.test(clusterState) == false) {
687709
logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
688-
final CountDownLatch latch = new CountDownLatch(1);
710+
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, initialStateTimeout, logger,
711+
thread.getThreadContext());
689712
observer.waitForNextChange(new ClusterStateObserver.Listener() {
690713
@Override
691-
public void onNewClusterState(ClusterState state) { latch.countDown(); }
714+
public void onNewClusterState(ClusterState state) {
715+
consumer.accept(state);
716+
}
692717

693718
@Override
694719
public void onClusterServiceClose() {
@@ -701,13 +726,42 @@ public void onTimeout(TimeValue timeout) {
701726
initialStateTimeout);
702727
latch.countDown();
703728
}
704-
}, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);
729+
}, clusterStatePredicate);
730+
} else {
731+
consumer.accept(clusterState);
732+
}
733+
try {
734+
latch.await();
735+
} catch (InterruptedException e) {
736+
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
737+
}
738+
}
739+
if (connectRemoteClusters.get()) {
740+
Consumer<ClusterState> consumer = state -> transportService.getRemoteClusterService().initializeRemoteClusters(
741+
state.metaData().clusterUUID(), state.metaData().settings(), ActionListener.wrap(v -> {},
742+
e -> logger.warn("Failed to connect to remote clusters", e)));
743+
if (connectRemoteClusterPredicate.test(clusterState) == false) {
744+
ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger,
745+
thread.getThreadContext());
746+
//
747+
observer.waitForNextChange(new ClusterStateObserver.Listener() {
748+
@Override
749+
public void onNewClusterState(ClusterState state) {
750+
consumer.accept(state);
751+
}
705752

706-
try {
707-
latch.await();
708-
} catch (InterruptedException e) {
709-
throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
710-
}
753+
@Override
754+
public void onClusterServiceClose() {
755+
}
756+
757+
@Override
758+
public void onTimeout(TimeValue timeout) {
759+
assert false;
760+
}
761+
}, connectRemoteClusterPredicate);
762+
763+
} else {
764+
consumer.accept(clusterState);
711765
}
712766
}
713767

server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 93 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@
2929
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
3030
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
3131
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
32+
import org.elasticsearch.action.main.MainAction;
33+
import org.elasticsearch.action.main.MainRequest;
34+
import org.elasticsearch.action.main.MainResponse;
3235
import org.elasticsearch.cluster.ClusterName;
3336
import org.elasticsearch.cluster.node.DiscoveryNode;
3437
import org.elasticsearch.cluster.node.DiscoveryNodes;
3538
import org.elasticsearch.common.component.AbstractComponent;
39+
import org.elasticsearch.common.io.stream.StreamInput;
3640
import org.elasticsearch.common.settings.Settings;
3741
import org.elasticsearch.common.transport.TransportAddress;
3842
import org.elasticsearch.common.unit.TimeValue;
@@ -50,6 +54,7 @@
5054
import java.util.HashSet;
5155
import java.util.Iterator;
5256
import java.util.List;
57+
import java.util.Objects;
5358
import java.util.Set;
5459
import java.util.concurrent.ArrayBlockingQueue;
5560
import java.util.concurrent.BlockingQueue;
@@ -83,11 +88,11 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8388
private final String clusterAlias;
8489
private final int maxNumRemoteConnections;
8590
private final Predicate<DiscoveryNode> nodePredicate;
91+
private final String localClusterUUID;
8692
private volatile List<DiscoveryNode> seedNodes;
8793
private volatile boolean skipUnavailable;
8894
private final ConnectHandler connectHandler;
89-
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
90-
private final ClusterName localClusterName;
95+
private final SetOnce<ClusterNameAndUUID> remoteClusterAndUUID = new SetOnce<>();
9196

9297
/**
9398
* Creates a new {@link RemoteClusterConnection}
@@ -99,10 +104,15 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
99104
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
100105
*/
101106
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
102-
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
107+
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate,
108+
String localClusterUUID) {
103109
super(settings);
104-
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
105110
this.transportService = transportService;
111+
if ("_na_".equals(localClusterUUID)) {
112+
throw new IllegalArgumentException("invalid local clusterstate UUID: " + localClusterUUID);
113+
}
114+
this.localClusterUUID = Objects.requireNonNull(localClusterUUID);
115+
106116
this.maxNumRemoteConnections = maxNumRemoteConnections;
107117
this.nodePredicate = nodePredicate;
108118
this.clusterAlias = clusterAlias;
@@ -312,15 +322,15 @@ public boolean isClosed() {
312322
return connectHandler.isClosed();
313323
}
314324

315-
private ConnectionProfile getRemoteProfile(ClusterName name) {
325+
private ConnectionProfile getRemoteProfile() {
316326
// we can only compare the cluster name to make a decision if we should use a remote profile
317327
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
318328
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
319329
// rather smallish optimization on the connection layer under certain situations where remote clusters
320330
// have the same name as the local one is minor here.
321331
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
322332
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
323-
if (this.localClusterName.equals(name)) {
333+
if (remoteClusterAndUUID.get() != null && this.localClusterUUID.equals(remoteClusterAndUUID.get().clusterUUID)) {
324334
return null;
325335
} else {
326336
return remoteProfile;
@@ -438,6 +448,67 @@ protected void doRun() {
438448
});
439449
}
440450

451+
Transport.Connection findFirstReadyNode(Iterator<DiscoveryNode> seedNodes) throws IOException, InterruptedException {
452+
boolean remoteClusterHasNotFormed = false;
453+
while (seedNodes.hasNext()) {
454+
if (Thread.currentThread().isInterrupted()) {
455+
throw new InterruptedException("remote connect thread got interrupted");
456+
}
457+
final DiscoveryNode seedNode = seedNodes.next();
458+
final MainResponse mainResponse;
459+
boolean success = false;
460+
Transport.Connection connection = transportService.openConnection(seedNode,
461+
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
462+
ThreadPool threadPool = transportService.getThreadPool();
463+
ThreadContext threadContext = threadPool.getThreadContext();
464+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
465+
// we stash any context here since this is an internal execution and should not leak any
466+
// existing context information.
467+
threadContext.markAsSystemContext();
468+
PlainTransportFuture<MainResponse> futureHandler = new PlainTransportFuture<>(
469+
new FutureTransportResponseHandler<MainResponse>() {
470+
@Override
471+
public MainResponse read(StreamInput in) throws IOException {
472+
MainResponse response = MainAction.INSTANCE.newResponse();
473+
response.readFrom(in);
474+
return response;
475+
}
476+
});
477+
TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(remoteProfile
478+
.getHandshakeTimeout().millis()).build();
479+
transportService.sendRequest(connection, MainAction.NAME, new MainRequest(), options,
480+
futureHandler);
481+
mainResponse = futureHandler.txGet();
482+
if ("_na_".equals(mainResponse.getClusterUuid()) == false) {
483+
ClusterNameAndUUID clusterNameAndUUID = remoteClusterAndUUID.get();
484+
if (clusterNameAndUUID == null) {
485+
remoteClusterAndUUID.set(new ClusterNameAndUUID(mainResponse.getClusterName(),
486+
mainResponse.getClusterUuid()));
487+
} else if (clusterNameAndUUID.clusterName.equals(mainResponse.getClusterName()) == false) {
488+
throw new IllegalStateException("handshake failed, mismatched cluster name [" + mainResponse.getClusterName()
489+
+ "] - " + seedNode);
490+
} else if (clusterNameAndUUID.clusterUUID.equals(mainResponse.getClusterUuid()) == false) {
491+
throw new IllegalStateException("handshake failed, mismatched cluster UUID [" + mainResponse.getClusterUuid()
492+
+ "] - " + seedNode);
493+
}
494+
success = true;
495+
return connection;
496+
} else {
497+
remoteClusterHasNotFormed = true;
498+
}
499+
} finally {
500+
if (success == false) {
501+
connection.close();
502+
}
503+
}
504+
}
505+
if (remoteClusterHasNotFormed) {
506+
throw new IllegalStateException("seed nodes have not joined a cluster yet");
507+
} else {
508+
throw new IllegalStateException("no seed node left");
509+
}
510+
}
511+
441512
void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
442513
final TransportService transportService, ActionListener<Void> listener) {
443514
if (Thread.currentThread().isInterrupted()) {
@@ -446,28 +517,23 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
446517
try {
447518
if (seedNodes.hasNext()) {
448519
cancellableThreads.executeIO(() -> {
449-
final DiscoveryNode seedNode = seedNodes.next();
450520
final TransportService.HandshakeResponse handshakeResponse;
451-
Transport.Connection connection = transportService.openConnection(seedNode,
452-
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
521+
Transport.Connection connection = findFirstReadyNode(seedNodes);
453522
boolean success = false;
454523
try {
455524
try {
456525
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
457-
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
526+
(c) -> c.equals(remoteClusterAndUUID.get().clusterName));
458527
} catch (IllegalStateException ex) {
459528
logger.warn(() -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
460-
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
529+
"cluster name {}", connection.getNode(), remoteClusterAndUUID.get()), ex);
461530
throw ex;
462531
}
463532

464533
final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
465534
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
466-
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
467-
if (remoteClusterName.get() == null) {
468-
assert handshakeResponse.getClusterName().value() != null;
469-
remoteClusterName.set(handshakeResponse.getClusterName());
470-
}
535+
transportService.connectToNode(handshakeNode, getRemoteProfile());
536+
assert handshakeResponse.getClusterName().value() != null;
471537
connectedNodes.add(handshakeNode);
472538
}
473539
ClusterStateRequest request = new ClusterStateRequest();
@@ -557,10 +623,6 @@ public ClusterStateResponse newInstance() {
557623
@Override
558624
public void handleResponse(ClusterStateResponse response) {
559625
try {
560-
if (remoteClusterName.get() == null) {
561-
assert response.getClusterName().value() != null;
562-
remoteClusterName.set(response.getClusterName());
563-
}
564626
try (Closeable theConnection = connection) { // the connection is unused - see comment in #collectRemoteNodes
565627
// we have to close this connection before we notify listeners - this is mainly needed for test correctness
566628
// since if we do it afterwards we might fail assertions that check if all high level connections are closed.
@@ -573,7 +635,7 @@ public void handleResponse(ClusterStateResponse response) {
573635
for (DiscoveryNode node : nodesIter) {
574636
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
575637
try {
576-
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
638+
transportService.connectToNode(node, getRemoteProfile()); // noop if node is
577639
// connected
578640
connectedNodes.add(node);
579641
} catch (ConnectTransportException | IllegalStateException ex) {
@@ -696,4 +758,14 @@ private synchronized void ensureIteratorAvailable() {
696758
}
697759
}
698760
}
761+
762+
private static class ClusterNameAndUUID {
763+
final ClusterName clusterName;
764+
final String clusterUUID;
765+
766+
private ClusterNameAndUUID(ClusterName clusterName, String clusterUUID) {
767+
this.clusterName = clusterName;
768+
this.clusterUUID = clusterUUID;
769+
}
770+
}
699771
}

0 commit comments

Comments
 (0)