Skip to content

Commit c863e2f

Browse files
authored
Rename skipping logic to remove hard link to skip_unavailable (#132861)
1 parent 1075553 commit c863e2f

File tree

5 files changed

+23
-60
lines changed

5 files changed

+23
-60
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
7171
private final boolean includeCCSMetadata;
7272

7373
// fields that are not Writeable since they are only needed on the primary CCS coordinator
74-
private final transient Predicate<String> skipUnavailablePredicate;
74+
private final transient Predicate<String> skipOnFailurePredicate; // Predicate to determine if we should skip a cluster on failure
7575
private volatile boolean isPartial; // Does this request have partial results?
7676
private transient volatile boolean isStopped; // Have we received stop command?
7777

@@ -81,17 +81,18 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
8181
private transient TimeSpan planningTimeSpan; // time elapsed since start of query to calling ComputeService.execute
8282
private TimeValue overallTook;
8383

84+
// This is only used is tests.
8485
public EsqlExecutionInfo(boolean includeCCSMetadata) {
85-
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
86+
this(Predicates.always(), includeCCSMetadata); // default all clusters to being skippable on failure
8687
}
8788

8889
/**
89-
* @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
90+
* @param skipOnPlanTimeFailurePredicate Decides whether we should skip the cluster that fails during planning phase.
9091
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
9192
*/
92-
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
93+
public EsqlExecutionInfo(Predicate<String> skipOnPlanTimeFailurePredicate, boolean includeCCSMetadata) {
9394
this.clusterInfo = new ConcurrentHashMap<>();
94-
this.skipUnavailablePredicate = skipUnavailablePredicate;
95+
this.skipOnFailurePredicate = skipOnPlanTimeFailurePredicate;
9596
this.includeCCSMetadata = includeCCSMetadata;
9697
this.relativeStart = TimeSpan.start();
9798
}
@@ -102,7 +103,7 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
102103
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
103104
this.clusterInfo = clusterInfo;
104105
this.includeCCSMetadata = includeCCSMetadata;
105-
this.skipUnavailablePredicate = Predicates.always();
106+
this.skipOnFailurePredicate = Predicates.always();
106107
this.relativeStart = null;
107108
}
108109

@@ -111,7 +112,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
111112
this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new);
112113
this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false;
113114
this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false;
114-
this.skipUnavailablePredicate = Predicates.always();
115+
this.skipOnFailurePredicate = Predicates.always();
115116
this.relativeStart = null;
116117
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION)
117118
|| in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_QUERY_PLANNING_DURATION_8_19)) {
@@ -200,15 +201,16 @@ public Set<String> clusterAliases() {
200201
}
201202

202203
/**
203-
* @param clusterAlias to lookup skip_unavailable from
204-
* @return skip_unavailable setting (true/false)
204+
* @param clusterAlias to check if we should skip this cluster on failure
205+
* @return whether it's OK to skip the cluster on failure.
205206
* @throws NoSuchRemoteClusterException if clusterAlias is unknown to this node's RemoteClusterService
206207
*/
207-
public boolean isSkipUnavailable(String clusterAlias) {
208+
public boolean shouldSkipOnFailure(String clusterAlias) {
208209
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
210+
// local cluster is not skippable for now
209211
return false;
210212
}
211-
return skipUnavailablePredicate.test(clusterAlias);
213+
return skipOnFailurePredicate.test(clusterAlias);
212214
}
213215

214216
public boolean isCrossClusterSearch() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void startComputeOnRemoteCluster(
8585
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
8686
listener = listener.delegateResponse((l, e) -> {
8787
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
88-
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)
88+
if (executionInfo.shouldSkipOnFailure(clusterAlias)
8989
|| (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e))) {
9090
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
9191
executionInfo,
@@ -107,7 +107,7 @@ void startComputeOnRemoteCluster(
107107
listener.delegateFailure((l, unused) -> {
108108
final CancellableTask groupTask;
109109
final Runnable onGroupFailure;
110-
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
110+
boolean failFast = executionInfo.shouldSkipOnFailure(clusterAlias) == false && configuration.allowPartialResults() == false;
111111
if (failFast) {
112112
groupTask = rootTask;
113113
onGroupFailure = cancelQueryOnFailure;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc
109109

110110
if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
111111
for (String clusterAlias : executionInfo.clusterAliases()) {
112-
if (executionInfo.isSkipUnavailable(clusterAlias) == false
113-
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
112+
// Check if we have any remotes that can't be skipped on failure.
113+
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
114+
&& executionInfo.shouldSkipOnFailure(clusterAlias) == false) {
114115
return false;
115116
}
116117
}
@@ -227,7 +228,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
227228
"Unknown index [%s]",
228229
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
229230
);
230-
if (executionInfo.isSkipUnavailable(c) == false || usedFilter) {
231+
if (executionInfo.shouldSkipOnFailure(c) == false || usedFilter) {
231232
if (fatalErrorMessage == null) {
232233
fatalErrorMessage = error;
233234
} else {
@@ -239,7 +240,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
239240
markClusterWithFinalStateAndNoShards(
240241
executionInfo,
241242
c,
242-
executionInfo.isSkipUnavailable(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED,
243+
executionInfo.shouldSkipOnFailure(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED,
243244
new VerificationException(error)
244245
);
245246
}
@@ -344,7 +345,7 @@ public static void initCrossClusterState(
344345
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
345346
executionInfo.swapCluster(clusterAlias, (k, v) -> {
346347
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
347-
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
348+
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
348349
});
349350
}
350351

@@ -389,13 +390,6 @@ public static void markClusterWithFinalStateAndNoShards(
389390
});
390391
}
391392

392-
/**
393-
* We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable.
394-
*/
395-
public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) {
396-
return executionInfo.isSkipUnavailable(clusterAlias);
397-
}
398-
399393
/**
400394
* Check whether this exception can be tolerated when partial results are on, or should be treated as fatal.
401395
* @return true if the exception can be tolerated, false if it should be treated as fatal.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ static void handleFieldCapsFailures(
332332
assert cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SUCCESSFUL : "can't mark a cluster success with failures";
333333
continue;
334334
}
335-
if (allowPartialResults == false && executionInfo.isSkipUnavailable(clusterAlias) == false) {
335+
if (allowPartialResults == false && executionInfo.shouldSkipOnFailure(clusterAlias) == false) {
336336
for (FieldCapabilitiesFailure failure : e.getValue()) {
337337
failureCollector.unwrapAndCollect(failure.getException());
338338
}
@@ -475,7 +475,7 @@ private void preAnalyzeLookupIndex(
475475
private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) {
476476
VerificationException error = new VerificationException(message);
477477
// If we can, skip the cluster and mark it as such
478-
if (executionInfo.isSkipUnavailable(clusterAlias)) {
478+
if (executionInfo.shouldSkipOnFailure(clusterAlias)) {
479479
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error);
480480
} else {
481481
throw error;

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtilsTests.java

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.esql.session;
99

1010
import org.apache.lucene.index.CorruptIndexException;
11-
import org.elasticsearch.ElasticsearchException;
1211
import org.elasticsearch.ElasticsearchStatusException;
1312
import org.elasticsearch.action.OriginalIndices;
1413
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
@@ -21,7 +20,6 @@
2120
import org.elasticsearch.license.XPackLicenseState;
2221
import org.elasticsearch.license.internal.XPackLicenseStatus;
2322
import org.elasticsearch.rest.RestStatus;
24-
import org.elasticsearch.tasks.TaskCancelledException;
2523
import org.elasticsearch.test.ESTestCase;
2624
import org.elasticsearch.transport.ConnectTransportException;
2725
import org.elasticsearch.transport.NoSeedNodeLeftException;
@@ -49,13 +47,11 @@
4947

5048
import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
5149
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState;
52-
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError;
5350
import static org.hamcrest.Matchers.containsInAnyOrder;
5451
import static org.hamcrest.Matchers.containsString;
5552
import static org.hamcrest.Matchers.equalTo;
5653
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5754
import static org.hamcrest.Matchers.hasSize;
58-
import static org.hamcrest.Matchers.is;
5955

6056
public class EsqlCCSUtilsTests extends ESTestCase {
6157

@@ -766,35 +762,6 @@ private void assertLicenseCheckFails(
766762
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
767763
}
768764

769-
public void testShouldIgnoreRuntimeError() {
770-
Predicate<String> skipUnPredicate = s -> s.equals(REMOTE1_ALIAS);
771-
772-
EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true);
773-
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
774-
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
775-
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));
776-
777-
// remote1: skip_unavailable=true, so should ignore connect errors, but not others
778-
assertThat(
779-
shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")),
780-
is(true)
781-
);
782-
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(true));
783-
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(true));
784-
// remote2: skip_unavailable=false, so should not ignore any errors
785-
assertThat(
786-
shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")),
787-
is(false)
788-
);
789-
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false));
790-
// same for local
791-
assertThat(
792-
shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")),
793-
is(false)
794-
);
795-
assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false));
796-
}
797-
798765
private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) {
799766
return new XPackLicenseStatus(operationMode, true, null);
800767
}

0 commit comments

Comments
 (0)