Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132833.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132833
summary: Adding simulate ingest effective mapping
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -2182,3 +2182,131 @@ setup:
- match: { docs.0.doc._index: "test" }
- match: { docs.0.doc._source.foo: "bar" }
- match: { docs.0.doc.error.type: "document_parsing_exception" }

---
"Test effective mapping":

# This creates two templates, where the first reroutes to the second. Then we simulate ingesting and make sure that
# the effective_mapping is for the index where the document eventually would land. Also, the second index is really
# a data stream, so we expect to see a @timestamp field.

- skip:
features:
- headers
- allowed_warnings

- do:
headers:
Content-Type: application/json
ingest.put_pipeline:
id: "reroute-pipeline"
body: >
{
"processors": [
{
"reroute": {
"destination": "second-index"
}
}
]
}
- match: { acknowledged: true }

- do:
allowed_warnings:
- "index template [first-index-template] has index patterns [first-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [first-index-template] will take precedence during new index creation"
indices.put_index_template:
name: first-index-template
body:
index_patterns: first-index*
template:
settings:
default_pipeline: "reroute-pipeline"
mappings:
dynamic: strict
properties:
foo:
type: text

- do:
allowed_warnings:
- "index template [second-index-template] has index patterns [second-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [second-index-template] will take precedence during new index creation"
indices.put_index_template:
name: second-index-template
body:
index_patterns: second-index*
template:
mappings:
dynamic: strict
properties:
bar:
type: text

- do:
indices.put_index_template:
name: second-index-template
body:
index_patterns: second-index*
template:
lifecycle:
data_retention: "7d"
mappings:
dynamic: strict
properties:
bar:
type: text
data_stream: {}

- do:
indices.create_data_stream:
name: second-index
- is_true: acknowledged

- do:
cluster.health:
wait_for_status: yellow

- do:
indices.put_data_stream_mappings:
name: second-index
body:
properties:
foo:
type: boolean

- match: { data_streams.0.applied_to_data_stream: true }

# Here is the meat of the test. We simulate ingesting into first-index, knowing it will be rerouted to second-index,
# which is actually a data stream. So we expect the effective_mapping to contain the fields from second-index
# (including the implicit @timestamp field) and not second-index. Plus, it ought to include fields from the
# mapping_addition that we pass in.
- do:
headers:
Content-Type: application/json
simulate.ingest:
body: >
{
"docs": [
{
"_index": "first-index",
"_id": "id",
"_source": {
"foo": "bar"
}
}
],
"mapping_addition": {
"dynamic": "strict",
"properties": {
"baz": {
"type": "keyword"
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "second-index" }
- not_exists: docs.0.doc.effective_mapping._doc.properties.foo
- match: { [email protected]: "date" }
- match: { docs.0.doc.effective_mapping._doc.properties.bar.type: "text" }
- match: { docs.0.doc.effective_mapping._doc.properties.baz.type: "keyword" }
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ static TransportVersion def(int id) {
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexSettingProvider;
Expand Down Expand Up @@ -144,14 +143,13 @@ protected void doInternalExecute(
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
IndexRequest request = (IndexRequest) docRequest;
Tuple<Collection<String>, Exception> validationResult = validateMappings(
ValidationResult validationResult = validateMappings(
componentTemplateSubstitutions,
indexTemplateSubstitutions,
mappingAddition,
request,
mappingMergeReason
);
Exception mappingValidationException = validationResult.v2();
responses.set(
i,
BulkItemResponse.success(
Expand All @@ -164,8 +162,9 @@ protected void doInternalExecute(
request.source(),
request.getContentType(),
request.getExecutedPipelines(),
validationResult.v1(),
mappingValidationException
validationResult.ignoredFields,
validationResult.validationException,
validationResult.effectiveMapping
)
)
);
Expand Down Expand Up @@ -193,7 +192,7 @@ private MapperService.MergeReason getMergeReason(String mergeType) {
* @return a Tuple containing: (1) in v1 the names of any fields that would be ignored upon indexing and (2) in v2 the mapping
* exception if the source does not match the mappings, otherwise null
*/
private Tuple<Collection<String>, Exception> validateMappings(
private ValidationResult validateMappings(
Map<String, ComponentTemplate> componentTemplateSubstitutions,
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
Map<String, Object> mappingAddition,
Expand All @@ -211,6 +210,7 @@ private Tuple<Collection<String>, Exception> validateMappings(
);

ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
CompressedXContent effectiveMapping = null;
Exception mappingValidationException = null;
Collection<String> ignoredFields = List.of();
IndexAbstraction indexAbstraction = project.getIndicesLookup().get(request.index());
Expand All @@ -222,8 +222,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
*/
IndexMetadata imd = project.getIndexSafe(indexAbstraction.getWriteIndex(request, project));
CompressedXContent mappings = Optional.ofNullable(imd.mapping()).map(MappingMetadata::source).orElse(null);
CompressedXContent mergedMappings = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index did not exist, or we have component template substitutions, so we put together the mappings from existing
Expand Down Expand Up @@ -281,8 +281,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
indexSettingProviders
);
CompressedXContent mappings = template.mappings();
CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappings(mappings, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedProjectMetadata, request.index(), false);
if (matchingTemplates.isEmpty() == false) {
Expand All @@ -295,23 +295,27 @@ private Tuple<Collection<String>, Exception> validateMappings(
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
xContentRegistry
);
final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index matched no templates and had no mapping of its own. If there were component template substitutions
* or index template substitutions, they didn't match anything. So just apply the mapping addition if it exists,
* and validate.
*/
final CompressedXContent combinedMappings = mergeMappings(null, mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
effectiveMapping = mergeMappings(null, mappingAddition);
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
}
}
}
} catch (Exception e) {
mappingValidationException = e;
}
return Tuple.tuple(ignoredFields, mappingValidationException);
return new ValidationResult(effectiveMapping, mappingValidationException, ignoredFields);
}

private record ValidationResult(CompressedXContent effectiveMapping, Exception validationException, Collection<String> ignoredFields) {

}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

/**
* This is an IndexResponse that is specifically for simulate requests. Unlike typical IndexResponses, we need to include the original
Expand All @@ -37,6 +39,7 @@ public class SimulateIndexResponse extends IndexResponse {
private final XContentType sourceXContentType;
private final Collection<String> ignoredFields;
private final Exception exception;
private final CompressedXContent effectiveMapping;

@SuppressWarnings("this-escape")
public SimulateIndexResponse(StreamInput in) throws IOException {
Expand All @@ -54,6 +57,15 @@ public SimulateIndexResponse(StreamInput in) throws IOException {
} else {
this.ignoredFields = List.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
if (in.readBoolean()) {
this.effectiveMapping = CompressedXContent.readCompressedString(in);
} else {
this.effectiveMapping = null;
}
} else {
effectiveMapping = null;
}
}

@SuppressWarnings("this-escape")
Expand All @@ -65,7 +77,8 @@ public SimulateIndexResponse(
XContentType sourceXContentType,
List<String> pipelines,
Collection<String> ignoredFields,
@Nullable Exception exception
@Nullable Exception exception,
@Nullable CompressedXContent effectiveMapping
) {
// We don't actually care about most of the IndexResponse fields:
super(
Expand All @@ -83,6 +96,7 @@ public SimulateIndexResponse(
setShardInfo(ShardInfo.EMPTY);
this.ignoredFields = ignoredFields;
this.exception = exception;
this.effectiveMapping = effectiveMapping;
}

@Override
Expand All @@ -108,6 +122,14 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
ElasticsearchException.generateThrowableXContent(builder, params, exception);
builder.endObject();
}
if (effectiveMapping == null) {
builder.field("effective_mapping", Map.of());
} else {
builder.field(
"effective_mapping",
XContentHelper.convertToMap(effectiveMapping.uncompressed(), true, builder.contentType()).v2()
);
}
return builder;
}

Expand All @@ -127,6 +149,12 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_IGNORED_FIELDS)) {
out.writeStringCollection(ignoredFields);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
out.writeBoolean(effectiveMapping != null);
if (effectiveMapping != null) {
effectiveMapping.writeTo(out);
}
}
}

public Exception getException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public void onResponse(BulkResponse response) {
"_index": "%s",
"_version": -3,
"_source": %s,
"executed_pipelines": [%s]
"executed_pipelines": [%s],
"effective_mapping":{}
}""",
indexRequest.id(),
indexRequest.index(),
Expand Down Expand Up @@ -319,7 +320,8 @@ public void onResponse(BulkResponse response) {
"_version": -3,
"_source": %s,
"executed_pipelines": [%s],
"error":{"type":"exception","reason":"invalid mapping"}
"error":{"type":"exception","reason":"invalid mapping"},
"effective_mapping":{"_doc":{"dynamic":"strict"}}
}""",
indexRequest.id(),
indexName,
Expand All @@ -346,7 +348,8 @@ public void onResponse(BulkResponse response) {
"_index": "%s",
"_version": -3,
"_source": %s,
"executed_pipelines": [%s]
"executed_pipelines": [%s],
"effective_mapping":{"_doc":{"dynamic":"strict"}}
}""",
indexRequest.id(),
indexName,
Expand All @@ -373,7 +376,9 @@ public void onFailure(Exception e) {
};
when(indicesService.withTempIndexService(any(), any())).thenAnswer((Answer<?>) invocation -> {
IndexMetadata imd = invocation.getArgument(0);
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())) {
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())
// We only want to throw exceptions inside TransportSimulateBulkAction:
&& invocation.getArgument(1).getClass().getSimpleName().contains(TransportSimulateBulkAction.class.getSimpleName())) {
throw new ElasticsearchException("invalid mapping");
} else {
// we don't actually care what is returned, as long as no exception is thrown the request is considered valid:
Expand Down
Loading