Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
private final boolean includeCCSMetadata;

// fields that are not Writeable since they are only needed on the primary CCS coordinator
private final transient Predicate<String> skipUnavailablePredicate;
private final transient Predicate<String> skipOnFailurePredicate; // Predicate to determine if we should skip a cluster on failure
private volatile boolean isPartial; // Does this request have partial results?
private transient volatile boolean isStopped; // Have we received stop command?

Expand All @@ -81,17 +81,18 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
private transient TimeSpan planningTimeSpan; // time elapsed since start of query to calling ComputeService.execute
private TimeValue overallTook;

// This is only used is tests.
public EsqlExecutionInfo(boolean includeCCSMetadata) {
this(Predicates.always(), includeCCSMetadata); // default all clusters to skip_unavailable=true
this(Predicates.always(), includeCCSMetadata); // default all clusters to being skippable on failure
}

/**
* @param skipUnavailablePredicate provide lookup for whether a given cluster has skip_unavailable set to true or false
* @param skipOnPlanTimeFailurePredicate Decides whether we should skip the cluster that fails during planning phase.
* @param includeCCSMetadata (user defined setting) whether to include the CCS metadata in the HTTP response
*/
public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean includeCCSMetadata) {
public EsqlExecutionInfo(Predicate<String> skipOnPlanTimeFailurePredicate, boolean includeCCSMetadata) {
this.clusterInfo = new ConcurrentHashMap<>();
this.skipUnavailablePredicate = skipUnavailablePredicate;
this.skipOnFailurePredicate = skipOnPlanTimeFailurePredicate;
this.includeCCSMetadata = includeCCSMetadata;
this.relativeStart = TimeSpan.start();
}
Expand All @@ -102,7 +103,7 @@ public EsqlExecutionInfo(Predicate<String> skipUnavailablePredicate, boolean inc
EsqlExecutionInfo(ConcurrentMap<String, Cluster> clusterInfo, boolean includeCCSMetadata) {
this.clusterInfo = clusterInfo;
this.includeCCSMetadata = includeCCSMetadata;
this.skipUnavailablePredicate = Predicates.always();
this.skipOnFailurePredicate = Predicates.always();
this.relativeStart = null;
}

Expand All @@ -111,7 +112,7 @@ public EsqlExecutionInfo(StreamInput in) throws IOException {
this.clusterInfo = in.readMapValues(EsqlExecutionInfo.Cluster::new, Cluster::getClusterAlias, ConcurrentHashMap::new);
this.includeCCSMetadata = in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readBoolean() : false;
this.isPartial = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_RESPONSE_PARTIAL) ? in.readBoolean() : false;
this.skipUnavailablePredicate = Predicates.always();
this.skipOnFailurePredicate = Predicates.always();
this.relativeStart = null;
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_QUERY_PLANNING_DURATION)
|| in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_QUERY_PLANNING_DURATION_8_19)) {
Expand Down Expand Up @@ -200,15 +201,16 @@ public Set<String> clusterAliases() {
}

/**
* @param clusterAlias to lookup skip_unavailable from
* @return skip_unavailable setting (true/false)
* @param clusterAlias to check if we should skip this cluster on failure
* @return whether it's OK to skip the cluster on failure.
* @throws NoSuchRemoteClusterException if clusterAlias is unknown to this node's RemoteClusterService
*/
public boolean isSkipUnavailable(String clusterAlias) {
public boolean shouldSkipOnFailure(String clusterAlias) {
if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) {
// local cluster is not skippable for now
return false;
}
return skipUnavailablePredicate.test(clusterAlias);
return skipOnFailurePredicate.test(clusterAlias);
}

public boolean isCrossClusterSearch() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void startComputeOnRemoteCluster(
final AtomicReference<ComputeResponse> finalResponse = new AtomicReference<>();
listener = listener.delegateResponse((l, e) -> {
final boolean receivedResults = finalResponse.get() != null || pagesFetched.get();
if (EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)
if (executionInfo.shouldSkipOnFailure(clusterAlias)
|| (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e))) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
executionInfo,
Expand All @@ -107,7 +107,7 @@ void startComputeOnRemoteCluster(
listener.delegateFailure((l, unused) -> {
final CancellableTask groupTask;
final Runnable onGroupFailure;
boolean failFast = executionInfo.isSkipUnavailable(clusterAlias) == false && configuration.allowPartialResults() == false;
boolean failFast = executionInfo.shouldSkipOnFailure(clusterAlias) == false && configuration.allowPartialResults() == false;
if (failFast) {
groupTask = rootTask;
onGroupFailure = cancelQueryOnFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ static boolean returnSuccessWithEmptyResult(EsqlExecutionInfo executionInfo, Exc

if (e instanceof NoClustersToSearchException || ExceptionsHelper.isRemoteUnavailableException(e)) {
for (String clusterAlias : executionInfo.clusterAliases()) {
if (executionInfo.isSkipUnavailable(clusterAlias) == false
&& clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) {
// Check if we have any remotes that can't be skipped on failure.
if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false
&& executionInfo.shouldSkipOnFailure(clusterAlias) == false) {
return false;
}
}
Expand Down Expand Up @@ -227,7 +228,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
"Unknown index [%s]",
(c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) ? indexExpression : c + ":" + indexExpression)
);
if (executionInfo.isSkipUnavailable(c) == false || usedFilter) {
if (executionInfo.shouldSkipOnFailure(c) == false || usedFilter) {
if (fatalErrorMessage == null) {
fatalErrorMessage = error;
} else {
Expand All @@ -239,7 +240,7 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(
markClusterWithFinalStateAndNoShards(
executionInfo,
c,
executionInfo.isSkipUnavailable(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED,
executionInfo.shouldSkipOnFailure(c) ? Cluster.Status.SKIPPED : Cluster.Status.FAILED,
new VerificationException(error)
);
}
Expand Down Expand Up @@ -344,7 +345,7 @@ public static void initCrossClusterState(
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.isSkipUnavailable(clusterAlias));
return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
});
}

Expand Down Expand Up @@ -389,13 +390,6 @@ public static void markClusterWithFinalStateAndNoShards(
});
}

/**
* We will ignore the error if it's remote unavailable and the cluster is marked to skip unavailable.
*/
public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo, String clusterAlias, Exception e) {
return executionInfo.isSkipUnavailable(clusterAlias);
}

/**
* Check whether this exception can be tolerated when partial results are on, or should be treated as fatal.
* @return true if the exception can be tolerated, false if it should be treated as fatal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ static void handleFieldCapsFailures(
assert cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SUCCESSFUL : "can't mark a cluster success with failures";
continue;
}
if (allowPartialResults == false && executionInfo.isSkipUnavailable(clusterAlias) == false) {
if (allowPartialResults == false && executionInfo.shouldSkipOnFailure(clusterAlias) == false) {
for (FieldCapabilitiesFailure failure : e.getValue()) {
failureCollector.unwrapAndCollect(failure.getException());
}
Expand Down Expand Up @@ -487,7 +487,7 @@ private void preAnalyzeLookupIndex(
private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) {
VerificationException error = new VerificationException(message);
// If we can, skip the cluster and mark it as such
if (executionInfo.isSkipUnavailable(clusterAlias)) {
if (executionInfo.shouldSkipOnFailure(clusterAlias)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error);
} else {
throw error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.session;

import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
Expand All @@ -21,7 +20,6 @@
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.license.internal.XPackLicenseStatus;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NoSeedNodeLeftException;
Expand Down Expand Up @@ -49,13 +47,11 @@

import static org.elasticsearch.xpack.esql.core.tree.Source.EMPTY;
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.initCrossClusterState;
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.shouldIgnoreRuntimeError;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class EsqlCCSUtilsTests extends ESTestCase {

Expand Down Expand Up @@ -766,35 +762,6 @@ private void assertLicenseCheckFails(
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
}

public void testShouldIgnoreRuntimeError() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is pretty much pointless now as shouldIgnoreRuntimeError doesn't have any logic left in it.

Predicate<String> skipUnPredicate = s -> s.equals(REMOTE1_ALIAS);

EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(skipUnPredicate, true);
executionInfo.swapCluster(LOCAL_CLUSTER_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(LOCAL_CLUSTER_ALIAS, "logs*", false));
executionInfo.swapCluster(REMOTE1_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE1_ALIAS, "*", true));
executionInfo.swapCluster(REMOTE2_ALIAS, (k, v) -> new EsqlExecutionInfo.Cluster(REMOTE2_ALIAS, "mylogs1,mylogs2,logs*", false));

// remote1: skip_unavailable=true, so should ignore connect errors, but not others
assertThat(
shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new IllegalStateException("Unable to open any connections")),
is(true)
);
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new TaskCancelledException("task cancelled")), is(true));
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE1_ALIAS, new ElasticsearchException("something is wrong")), is(true));
// remote2: skip_unavailable=false, so should not ignore any errors
assertThat(
shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new IllegalStateException("Unable to open any connections")),
is(false)
);
assertThat(shouldIgnoreRuntimeError(executionInfo, REMOTE2_ALIAS, new TaskCancelledException("task cancelled")), is(false));
// same for local
assertThat(
shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new IllegalStateException("Unable to open any connections")),
is(false)
);
assertThat(shouldIgnoreRuntimeError(executionInfo, LOCAL_CLUSTER_ALIAS, new TaskCancelledException("task cancelled")), is(false));
}

private XPackLicenseStatus activeLicenseStatus(License.OperationMode operationMode) {
return new XPackLicenseStatus(operationMode, true, null);
}
Expand Down