Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132570.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132570
summary: Always stop the timer when profiling the fetch phase
area: Search
type: bug
issues: []
26 changes: 15 additions & 11 deletions server/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,21 @@ private SearchHits buildSearchHits(SearchContext context, int[] docIdsToLoad, Pr
@Override
protected void setNextReader(LeafReaderContext ctx, int[] docsInLeaf) throws IOException {
Timer timer = profiler.startNextReader();
this.ctx = ctx;
this.leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(ctx);
this.leafStoredFieldLoader = storedFieldLoader.getLoader(ctx, docsInLeaf);
this.leafSourceLoader = sourceLoader.leaf(ctx.reader(), docsInLeaf);
this.leafIdLoader = idLoader.leaf(leafStoredFieldLoader, ctx.reader(), docsInLeaf);
fieldLookupProvider.setNextReader(ctx);
for (FetchSubPhaseProcessor processor : processors) {
processor.setNextReader(ctx);
}
if (timer != null) {
timer.stop();
try {
this.ctx = ctx;
this.leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(ctx);
this.leafStoredFieldLoader = storedFieldLoader.getLoader(ctx, docsInLeaf);
this.leafSourceLoader = sourceLoader.leaf(ctx.reader(), docsInLeaf);
this.leafIdLoader = idLoader.leaf(leafStoredFieldLoader, ctx.reader(), docsInLeaf);

fieldLookupProvider.setNextReader(ctx);
for (FetchSubPhaseProcessor processor : processors) {
processor.setNextReader(ctx);
}
} finally {
if (timer != null) {
timer.stop();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.FetchPhaseExecutionException;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.FetchSubPhaseProcessor;
Expand All @@ -63,6 +64,7 @@
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -873,6 +875,63 @@ public StoredFieldsSpec storedFieldsSpec() {
}
}

public void testTimerStoppedAndSubPhasesExceptionsPropagate() throws IOException {
// if the timer is not stopped properly whilst profiling the fetch phase the exceptions
// in sub phases#setNextReader will not propagate as the cause that failed the fetch phase (instead a timer illegal state exception
// will propagate)
// this tests ensures that exceptions in sub phases are propagated correctly as the cause of the fetch phase failure (which in turn
// implies the timer was handled correctly)
Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir);

String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }";
for (int i = 0; i < 10; i++) {
Document document = new Document();
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
w.addDocument(document);
}
if (randomBoolean()) {
w.forceMerge(1);
}
IndexReader r = w.getReader();
w.close();
ContextIndexSearcher contextIndexSearcher = createSearcher(r);
try (
SearchContext searchContext = createSearchContext(
contextIndexSearcher,
true,
new NoopCircuitBreaker(CircuitBreaker.REQUEST),
true
)
) {
FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() {
@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
throw new IOException("bad things");
}

@Override
public void process(FetchSubPhase.HitContext hitContext) throws IOException {
Source source = hitContext.source();
hitContext.hit().sourceRef(source.internalSourceRef());
}

@Override
public StoredFieldsSpec storedFieldsSpec() {
return StoredFieldsSpec.NEEDS_SOURCE;
}
}));
FetchPhaseExecutionException fetchPhaseExecutionException = assertThrows(
FetchPhaseExecutionException.class,
() -> fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null)
);
assertThat(fetchPhaseExecutionException.getCause().getMessage(), is("bad things"));
} finally {
r.close();
dir.close();
}
}

private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException {
return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() {
@Override
Expand Down Expand Up @@ -910,13 +969,22 @@ public StoredFieldsSpec storedFieldsSpec() {
}

private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) {
return createSearchContext(contextIndexSearcher, allowPartialResults, null);
return createSearchContext(contextIndexSearcher, allowPartialResults, null, false);
}

private static SearchContext createSearchContext(
ContextIndexSearcher contextIndexSearcher,
boolean allowPartialResults,
@Nullable CircuitBreaker circuitBreaker
) {
return createSearchContext(contextIndexSearcher, allowPartialResults, circuitBreaker, false);
}

private static SearchContext createSearchContext(
ContextIndexSearcher contextIndexSearcher,
boolean allowPartialResults,
@Nullable CircuitBreaker circuitBreaker,
boolean profileEnabled
) {
IndexSettings indexSettings = new IndexSettings(
IndexMetadata.builder("index")
Expand Down Expand Up @@ -999,6 +1067,11 @@ public CircuitBreaker circuitBreaker() {
return super.circuitBreaker();
}
}

@Override
public Profilers getProfilers() {
return profileEnabled ? new Profilers(contextIndexSearcher) : null;
}
};
searchContext.addReleasable(searchContext.fetchResult()::decRef);
searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap()));
Expand Down