Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark.index.codec.tsdb;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.codecs.DocValuesFormat;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.index.codec.Elasticsearch816Codec;
import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.io.IOException;
import java.nio.file.Files;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(1)
@Threads(1)
@Warmup(iterations = 0)
@Measurement(iterations = 1)
public class TSDBDocValuesMergeBenchmark {

static {
// For Elasticsearch900Lucene101Codec:
LogConfigurator.loadLog4jPlugins();
LogConfigurator.configureESLogging();
LogConfigurator.setNodeName("test");
}

@Param("20431204")
private int nDocs;

@Param("1000")
private int deltaTime;

@Param("42")
private int seed;

private static final String TIMESTAMP_FIELD = "@timestamp";
private static final String HOSTNAME_FIELD = "host.name";
private static final long BASE_TIMESTAMP = 1704067200000L;

private IndexWriter indexWriterWithoutOptimizedMerge;
private IndexWriter indexWriterWithOptimizedMerge;
private ExecutorService executorService;

public static void main(String[] args) throws RunnerException {
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
.addProfiler(AsyncProfiler.class)
.build();

new Runner(options).run();
}

@Setup(Level.Trial)
public void setup() throws IOException {
executorService = Executors.newSingleThreadExecutor();

final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));

indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
}

private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException {
final var iwc = createIndexWriterConfig(optimizedMergeEnabled);
long counter1 = 0;
long counter2 = 10_000_000;
long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 };
long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 };
int numHosts = 1000;
String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" };

final Random random = new Random(seed);
IndexWriter indexWriter = new IndexWriter(directory, iwc);
for (int i = 0; i < nDocs; i++) {
final Document doc = new Document();

final int batchIndex = i / numHosts;
final String hostName = "host-" + batchIndex;
// Slightly vary the timestamp in each document
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);

doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
int numTags = tags.length % (i + 1);
for (int j = 0; j < numTags; j++) {
doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j])));
}

indexWriter.addDocument(doc);
}
indexWriter.commit();
return indexWriter;
}

@Benchmark
public void forceMergeWithoutOptimizedMerge() throws IOException {
forceMerge(indexWriterWithoutOptimizedMerge);
}

@Benchmark
public void forceMergeWithOptimizedMerge() throws IOException {
forceMerge(indexWriterWithOptimizedMerge);
}

private void forceMerge(final IndexWriter indexWriter) throws IOException {
indexWriter.forceMerge(1);
}

@TearDown(Level.Trial)
public void tearDown() {
if (executorService != null) {
executorService.shutdown();
try {
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeEnabled) {
var config = new IndexWriterConfig(new StandardAnalyzer());
// NOTE: index sort config matching LogsDB's sort order
config.setIndexSort(
new Sort(
new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false),
new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true)
)
);
config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER);
config.setMergePolicy(new LogByteSizeMergePolicy());
var docValuesFormat = new ES819TSDBDocValuesFormat(optimizedMergeEnabled);
config.setCodec(new Elasticsearch816Codec() {

@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
return docValuesFormat;
}
});
return config;
}
}
5 changes: 5 additions & 0 deletions docs/changelog/125403.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 125403
summary: First step optimizing tsdb doc values codec merging
area: Codec
type: enhancement
issues: []
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -479,4 +479,5 @@
exports org.elasticsearch.inference.configuration;
exports org.elasticsearch.monitor.metrics;
exports org.elasticsearch.plugins.internal.rewriter to org.elasticsearch.inference;
exports org.elasticsearch.index.codec.perfield;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.apache.lucene.codecs.lucene912.Lucene912Codec;
import org.apache.lucene.codecs.lucene912.Lucene912PostingsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat;
import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat;
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat;
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;

/**
Expand All @@ -39,7 +39,7 @@ public PostingsFormat getPostingsFormatForField(String field) {
};

private final DocValuesFormat defaultDVFormat;
private final DocValuesFormat docValuesFormat = new PerFieldDocValuesFormat() {
private final DocValuesFormat docValuesFormat = new XPerFieldDocValuesFormat() {
@Override
public DocValuesFormat getDocValuesFormatForField(String field) {
return Elasticsearch816Codec.this.getDocValuesFormatForField(field);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.codec;

import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;

import java.io.IOException;

/**
* Implementation that allows wrapping another {@link DocValuesProducer} and alter behaviour of the wrapped instance.
*/
public abstract class FilterDocValuesProducer extends DocValuesProducer {
private final DocValuesProducer in;

protected FilterDocValuesProducer(DocValuesProducer in) {
this.in = in;
}

@Override
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
return in.getNumeric(field);
}

@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
return in.getBinary(field);
}

@Override
public SortedDocValues getSorted(FieldInfo field) throws IOException {
return in.getSorted(field);
}

@Override
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
return in.getSortedNumeric(field);
}

@Override
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
return in.getSortedSet(field);
}

@Override
public void checkIntegrity() throws IOException {
in.checkIntegrity();
}

@Override
public void close() throws IOException {
in.close();
}

public DocValuesProducer getIn() {
return in;
}
}
Loading