|
50 | 50 | import org.elasticsearch.search.SearchPhaseResult; |
51 | 51 | import org.elasticsearch.search.SearchShardTarget; |
52 | 52 | import org.elasticsearch.search.fetch.FetchPhase; |
| 53 | +import org.elasticsearch.search.fetch.FetchPhaseExecutionException; |
53 | 54 | import org.elasticsearch.search.fetch.FetchSearchResult; |
54 | 55 | import org.elasticsearch.search.fetch.FetchSubPhase; |
55 | 56 | import org.elasticsearch.search.fetch.FetchSubPhaseProcessor; |
|
63 | 64 | import org.elasticsearch.search.internal.ShardSearchRequest; |
64 | 65 | import org.elasticsearch.search.lookup.Source; |
65 | 66 | import org.elasticsearch.search.profile.ProfileResult; |
| 67 | +import org.elasticsearch.search.profile.Profilers; |
66 | 68 | import org.elasticsearch.search.profile.SearchProfileQueryPhaseResult; |
67 | 69 | import org.elasticsearch.search.profile.SearchProfileShardResult; |
68 | 70 | import org.elasticsearch.search.query.QuerySearchResult; |
@@ -873,6 +875,63 @@ public StoredFieldsSpec storedFieldsSpec() { |
873 | 875 | } |
874 | 876 | } |
875 | 877 |
|
| 878 | + public void testTimerStoppedAndSubPhasesExceptionsPropagate() throws IOException { |
| 879 | + // if the timer is not stopped properly whilst profiling the fetch phase the exceptions |
| 880 | + // in sub phases#setNextReader will not propagate as the cause that failed the fetch phase (instead a timer illegal state exception |
| 881 | + // will propagate) |
| 882 | + // this tests ensures that exceptions in sub phases are propagated correctly as the cause of the fetch phase failure (which in turn |
| 883 | + // implies the timer was handled correctly) |
| 884 | + Directory dir = newDirectory(); |
| 885 | + RandomIndexWriter w = new RandomIndexWriter(random(), dir); |
| 886 | + |
| 887 | + String body = "{ \"thefield\": \" " + randomAlphaOfLength(48_000) + "\" }"; |
| 888 | + for (int i = 0; i < 10; i++) { |
| 889 | + Document document = new Document(); |
| 890 | + document.add(new StringField("id", Integer.toString(i), Field.Store.YES)); |
| 891 | + w.addDocument(document); |
| 892 | + } |
| 893 | + if (randomBoolean()) { |
| 894 | + w.forceMerge(1); |
| 895 | + } |
| 896 | + IndexReader r = w.getReader(); |
| 897 | + w.close(); |
| 898 | + ContextIndexSearcher contextIndexSearcher = createSearcher(r); |
| 899 | + try ( |
| 900 | + SearchContext searchContext = createSearchContext( |
| 901 | + contextIndexSearcher, |
| 902 | + true, |
| 903 | + new NoopCircuitBreaker(CircuitBreaker.REQUEST), |
| 904 | + true |
| 905 | + ) |
| 906 | + ) { |
| 907 | + FetchPhase fetchPhase = new FetchPhase(List.of(fetchContext -> new FetchSubPhaseProcessor() { |
| 908 | + @Override |
| 909 | + public void setNextReader(LeafReaderContext readerContext) throws IOException { |
| 910 | + throw new IOException("bad things"); |
| 911 | + } |
| 912 | + |
| 913 | + @Override |
| 914 | + public void process(FetchSubPhase.HitContext hitContext) throws IOException { |
| 915 | + Source source = hitContext.source(); |
| 916 | + hitContext.hit().sourceRef(source.internalSourceRef()); |
| 917 | + } |
| 918 | + |
| 919 | + @Override |
| 920 | + public StoredFieldsSpec storedFieldsSpec() { |
| 921 | + return StoredFieldsSpec.NEEDS_SOURCE; |
| 922 | + } |
| 923 | + })); |
| 924 | + FetchPhaseExecutionException fetchPhaseExecutionException = assertThrows( |
| 925 | + FetchPhaseExecutionException.class, |
| 926 | + () -> fetchPhase.execute(searchContext, IntStream.range(0, 100).toArray(), null) |
| 927 | + ); |
| 928 | + assertThat(fetchPhaseExecutionException.getCause().getMessage(), is("bad things")); |
| 929 | + } finally { |
| 930 | + r.close(); |
| 931 | + dir.close(); |
| 932 | + } |
| 933 | + } |
| 934 | + |
876 | 935 | private static ContextIndexSearcher createSearcher(IndexReader reader) throws IOException { |
877 | 936 | return new ContextIndexSearcher(reader, null, null, new QueryCachingPolicy() { |
878 | 937 | @Override |
@@ -910,13 +969,22 @@ public StoredFieldsSpec storedFieldsSpec() { |
910 | 969 | } |
911 | 970 |
|
912 | 971 | private static SearchContext createSearchContext(ContextIndexSearcher contextIndexSearcher, boolean allowPartialResults) { |
913 | | - return createSearchContext(contextIndexSearcher, allowPartialResults, null); |
| 972 | + return createSearchContext(contextIndexSearcher, allowPartialResults, null, false); |
914 | 973 | } |
915 | 974 |
|
916 | 975 | private static SearchContext createSearchContext( |
917 | 976 | ContextIndexSearcher contextIndexSearcher, |
918 | 977 | boolean allowPartialResults, |
919 | 978 | @Nullable CircuitBreaker circuitBreaker |
| 979 | + ) { |
| 980 | + return createSearchContext(contextIndexSearcher, allowPartialResults, circuitBreaker, false); |
| 981 | + } |
| 982 | + |
| 983 | + private static SearchContext createSearchContext( |
| 984 | + ContextIndexSearcher contextIndexSearcher, |
| 985 | + boolean allowPartialResults, |
| 986 | + @Nullable CircuitBreaker circuitBreaker, |
| 987 | + boolean profileEnabled |
920 | 988 | ) { |
921 | 989 | IndexSettings indexSettings = new IndexSettings( |
922 | 990 | IndexMetadata.builder("index") |
@@ -999,6 +1067,11 @@ public CircuitBreaker circuitBreaker() { |
999 | 1067 | return super.circuitBreaker(); |
1000 | 1068 | } |
1001 | 1069 | } |
| 1070 | + |
| 1071 | + @Override |
| 1072 | + public Profilers getProfilers() { |
| 1073 | + return profileEnabled ? new Profilers(contextIndexSearcher) : null; |
| 1074 | + } |
1002 | 1075 | }; |
1003 | 1076 | searchContext.addReleasable(searchContext.fetchResult()::decRef); |
1004 | 1077 | searchContext.setTask(new SearchShardTask(-1, "type", "action", "description", null, Collections.emptyMap())); |
|
0 commit comments