Skip to content

Commit e496e0d

Browse files
committed
First step optimizing tsdb doc values codec merging.
The doc values codec iterates a few times over the doc value instance that needs to be written to disk. In case when merging and index sorting is enabled, this is much more expensive, as each time the doc values instance is iterated an expensive doc id sorting is performed (in order to get the doc ids in order of index sorting). There are several reasons why the doc value instance is iterated multiple times: * To compute stats (num values, number of docs with value) required for writing values to disk. * To write bitset that indicate which documents have a value. (indexed disi, jump table) * To write the actual values to disk. * To write the addresses to disk (in case docs have multiple values) This applies for numeric doc values, but also for the ordinals of sorted (set) doc values. This PR addresses solving the first reason why doc value instance needs to be iterated. This is done only when in case of merging and when the segments to be merged with are also of type es87 doc values, codec version is the same and there are no deletes.
1 parent abd2bdd commit e496e0d

File tree

6 files changed

+648
-19
lines changed

6 files changed

+648
-19
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.index.codec.tsdb;
11+
12+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
13+
import org.apache.lucene.codecs.DocValuesFormat;
14+
import org.apache.lucene.codecs.lucene101.Lucene101Codec;
15+
import org.apache.lucene.document.Document;
16+
import org.apache.lucene.document.SortedDocValuesField;
17+
import org.apache.lucene.document.SortedNumericDocValuesField;
18+
import org.apache.lucene.index.IndexWriter;
19+
import org.apache.lucene.index.IndexWriterConfig;
20+
import org.apache.lucene.search.Sort;
21+
import org.apache.lucene.search.SortField;
22+
import org.apache.lucene.search.SortedNumericSortField;
23+
import org.apache.lucene.store.Directory;
24+
import org.apache.lucene.store.FSDirectory;
25+
import org.apache.lucene.util.BytesRef;
26+
import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormat;
27+
import org.openjdk.jmh.annotations.Benchmark;
28+
import org.openjdk.jmh.annotations.BenchmarkMode;
29+
import org.openjdk.jmh.annotations.Fork;
30+
import org.openjdk.jmh.annotations.Level;
31+
import org.openjdk.jmh.annotations.Measurement;
32+
import org.openjdk.jmh.annotations.Mode;
33+
import org.openjdk.jmh.annotations.OutputTimeUnit;
34+
import org.openjdk.jmh.annotations.Param;
35+
import org.openjdk.jmh.annotations.Scope;
36+
import org.openjdk.jmh.annotations.Setup;
37+
import org.openjdk.jmh.annotations.State;
38+
import org.openjdk.jmh.annotations.TearDown;
39+
import org.openjdk.jmh.annotations.Threads;
40+
import org.openjdk.jmh.annotations.Warmup;
41+
import org.openjdk.jmh.profile.AsyncProfiler;
42+
import org.openjdk.jmh.runner.Runner;
43+
import org.openjdk.jmh.runner.RunnerException;
44+
import org.openjdk.jmh.runner.options.Options;
45+
import org.openjdk.jmh.runner.options.OptionsBuilder;
46+
47+
import java.io.IOException;
48+
import java.nio.file.Files;
49+
import java.util.Random;
50+
import java.util.concurrent.ExecutorService;
51+
import java.util.concurrent.Executors;
52+
import java.util.concurrent.TimeUnit;
53+
54+
@BenchmarkMode(Mode.SampleTime)
55+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
56+
@State(Scope.Benchmark)
57+
@Fork(1)
58+
@Threads(1)
59+
@Warmup(iterations = 0)
60+
@Measurement(iterations = 1)
61+
public class TSDBDocValuesMergeBenchmark {
62+
63+
@Param("13431204")
64+
private int nDocs;
65+
66+
@Param("1000")
67+
private int deltaTime;
68+
69+
@Param("42")
70+
private int seed;
71+
72+
private static final String TIMESTAMP_FIELD = "@timestamp";
73+
private static final String HOSTNAME_FIELD = "host.name";
74+
private static final long BASE_TIMESTAMP = 1704067200000L;
75+
76+
private IndexWriter indexWriterWithoutOptimizedMerge;
77+
private IndexWriter indexWriterWithOptimizedMerge;
78+
private ExecutorService executorService;
79+
80+
public static void main(String[] args) throws RunnerException {
81+
final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName())
82+
.addProfiler(AsyncProfiler.class)
83+
.build();
84+
85+
new Runner(options).run();
86+
}
87+
88+
@Setup(Level.Trial)
89+
public void setup() throws IOException {
90+
executorService = Executors.newSingleThreadExecutor();
91+
92+
final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-"));
93+
final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-"));
94+
95+
indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false);
96+
indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true);
97+
}
98+
99+
private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled)
100+
throws IOException {
101+
102+
final IndexWriterConfig config = new IndexWriterConfig(new StandardAnalyzer());
103+
// NOTE: index sort config matching LogsDB's sort order
104+
config.setIndexSort(
105+
new Sort(
106+
new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false),
107+
new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true)
108+
)
109+
);
110+
ES87TSDBDocValuesFormat docValuesFormat = new ES87TSDBDocValuesFormat(4096, optimizedMergeEnabled);
111+
config.setCodec(new Lucene101Codec() {
112+
113+
@Override
114+
public DocValuesFormat getDocValuesFormatForField(String field) {
115+
return docValuesFormat;
116+
}
117+
});
118+
119+
long counter1 = 0;
120+
long counter2 = 10_000_000;
121+
long[] gauge1Values = new long[] {2, 4, 6, 8, 10, 12, 14, 16};
122+
long[] gauge2Values = new long[] {-2, -4, -6, -8, -10, -12, -14, -16};
123+
int numHosts = 1000;
124+
125+
final Random random = new Random(seed);
126+
IndexWriter indexWriter = new IndexWriter(directory, config);
127+
for (int i = 0; i < nDocs; i++) {
128+
final Document doc = new Document();
129+
130+
final int batchIndex = i / numHosts;
131+
final String hostName = "host-" + batchIndex;
132+
// Slightly vary the timestamp in each document
133+
final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime);
134+
135+
doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName)));
136+
doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp));
137+
doc.add(new SortedNumericDocValuesField("counter_1", counter1++));
138+
doc.add(new SortedNumericDocValuesField("counter_2", counter2++));
139+
doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length]));
140+
doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length]));
141+
142+
indexWriter.addDocument(doc);
143+
}
144+
indexWriter.commit();
145+
return indexWriter;
146+
}
147+
148+
@Benchmark
149+
public void forceMergeWithoutOptimizedMerge() throws IOException {
150+
forceMerge(indexWriterWithoutOptimizedMerge);
151+
}
152+
153+
@Benchmark
154+
public void forceMergeWithOptimizedMerge() throws IOException {
155+
forceMerge(indexWriterWithOptimizedMerge);
156+
}
157+
158+
private void forceMerge(final IndexWriter indexWriter) throws IOException {
159+
indexWriter.forceMerge(1);
160+
}
161+
162+
@TearDown(Level.Trial)
163+
public void tearDown() {
164+
if (executorService != null) {
165+
executorService.shutdown();
166+
try {
167+
if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) {
168+
executorService.shutdownNow();
169+
}
170+
} catch (InterruptedException e) {
171+
executorService.shutdownNow();
172+
Thread.currentThread().interrupt();
173+
}
174+
}
175+
}
176+
}

0 commit comments

Comments
 (0)