diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java new file mode 100644 index 00000000000..7eb6ea51055 --- /dev/null +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.benchmark; + +import org.logstash.common.BufferedTokenizer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + + +@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS) +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +public class BufferedTokenizerBenchmark { + + private BufferedTokenizer sut; + private String singleTokenPerFragment; + private String multipleTokensPerFragment; + private String multipleTokensSpreadMultipleFragments_1; + private String multipleTokensSpreadMultipleFragments_2; + private String multipleTokensSpreadMultipleFragments_3; + + @Setup(Level.Invocation) + public void setUp() { + sut = new BufferedTokenizer(); + singleTokenPerFragment = "a".repeat(512) + "\n"; + + multipleTokensPerFragment = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n"; + + multipleTokensSpreadMultipleFragments_1 = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256); + multipleTokensSpreadMultipleFragments_2 = "c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256); + multipleTokensSpreadMultipleFragments_3 = "f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n"; + } + + @Benchmark + public final void onlyOneTokenPerFragment(Blackhole blackhole) { + Iterable tokens = sut.extract(singleTokenPerFragment); + tokens.forEach(blackhole::consume); + blackhole.consume(tokens); + } + + @Benchmark + public final void multipleTokenPerFragment(Blackhole blackhole) { + Iterable tokens = sut.extract(multipleTokensPerFragment); + tokens.forEach(blackhole::consume); + blackhole.consume(tokens); + } + + @Benchmark + public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { + Iterable tokens = sut.extract(multipleTokensSpreadMultipleFragments_1); + tokens.forEach(t -> {}); + blackhole.consume(tokens); + + tokens = sut.extract(multipleTokensSpreadMultipleFragments_2); + tokens.forEach(t -> {}); + blackhole.consume(tokens); + + tokens = sut.extract(multipleTokensSpreadMultipleFragments_3); + tokens.forEach(blackhole::consume); + blackhole.consume(tokens); + } +} diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java deleted file mode 100644 index 5b01cebb3e5..00000000000 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.logstash.benchmark; - -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; -import org.logstash.RubyUtil; -import org.logstash.common.BufferedTokenizerExt; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; - -import java.util.concurrent.TimeUnit; - -import static org.logstash.RubyUtil.RUBY; - -@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS) -@Fork(1) -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -public class BufferedTokenizerExtBenchmark { - - private BufferedTokenizerExt sut; - private ThreadContext context; - private RubyString singleTokenPerFragment; - private RubyString multipleTokensPerFragment; - private RubyString multipleTokensSpreadMultipleFragments_1; - private RubyString multipleTokensSpreadMultipleFragments_2; - private RubyString multipleTokensSpreadMultipleFragments_3; - - @Setup(Level.Invocation) - public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {}; - sut.init(context, args); - singleTokenPerFragment = RubyUtil.RUBY.newString("a".repeat(512) + "\n"); - - multipleTokensPerFragment = RubyUtil.RUBY.newString("a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n"); - - multipleTokensSpreadMultipleFragments_1 = RubyUtil.RUBY.newString("a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256)); - multipleTokensSpreadMultipleFragments_2 = RubyUtil.RUBY.newString("c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256)); - multipleTokensSpreadMultipleFragments_3 = RubyUtil.RUBY.newString("f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n"); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void onlyOneTokenPerFragment(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, singleTokenPerFragment); - blackhole.consume(tokens); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void multipleTokenPerFragment(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, multipleTokensPerFragment); - blackhole.consume(tokens); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_1); - blackhole.consume(tokens); - - tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_2); - blackhole.consume(tokens); - - tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_3); - blackhole.consume(tokens); - } -} diff --git a/logstash-core/spec/logstash/util/buftok_spec.rb b/logstash-core/spec/logstash/util/buftok_spec.rb index 3d7b4b1990f..4eceb20014e 100644 --- a/logstash-core/spec/logstash/util/buftok_spec.rb +++ b/logstash-core/spec/logstash/util/buftok_spec.rb @@ -20,27 +20,33 @@ describe FileWatch::BufferedTokenizer do subject { FileWatch::BufferedTokenizer.new } + def to_list(iterator) + a = [] + iterator.each { |v| a << v } + return a + end + it "should tokenize a single token" do - expect(subject.extract("foo\n")).to eq(["foo"]) + expect(to_list(subject.extract("foo\n"))).to eq(["foo"]) end it "should merge multiple token" do - expect(subject.extract("foo")).to eq([]) - expect(subject.extract("bar\n")).to eq(["foobar"]) + expect(to_list(subject.extract("foo"))).to eq([]) + expect(to_list(subject.extract("bar\n"))).to eq(["foobar"]) end it "should tokenize multiple token" do - expect(subject.extract("foo\nbar\n")).to eq(["foo", "bar"]) + expect(to_list(subject.extract("foo\nbar\n"))).to eq(["foo", "bar"]) end it "should ignore empty payload" do - expect(subject.extract("")).to eq([]) - expect(subject.extract("foo\nbar")).to eq(["foo"]) + expect(to_list(subject.extract(""))).to eq([]) + expect(to_list(subject.extract("foo\nbar"))).to eq(["foo"]) end it "should tokenize empty payload with newline" do - expect(subject.extract("\n")).to eq([""]) - expect(subject.extract("\n\n\n")).to eq(["", "", ""]) + expect(to_list(subject.extract("\n"))).to eq([""]) + expect(to_list(subject.extract("\n\n\n"))).to eq(["", "", ""]) end describe 'flush' do @@ -83,12 +89,12 @@ let(:delimiter) { "||" } it "should tokenize multiple token" do - expect(subject.extract("foo||b|r||")).to eq(["foo", "b|r"]) + expect(to_list(subject.extract("foo||b|r||"))).to eq(["foo", "b|r"]) end it "should ignore empty payload" do - expect(subject.extract("")).to eq([]) - expect(subject.extract("foo||bar")).to eq(["foo"]) + expect(to_list(subject.extract(""))).to eq([]) + expect(to_list(subject.extract("foo||bar"))).to eq(["foo"]) end end end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 564f51bc085..50ee6d49442 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -34,6 +34,7 @@ import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt; import org.logstash.common.BufferedTokenizerExt; +import org.logstash.common.BufferedTokenizer; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java new file mode 100644 index 00000000000..04cf4fad220 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.logstash.common; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class BufferedTokenizer { + + private final DataSplitter dataSplitter; + private final Iterable iterable; + + static abstract class IteratorDecorator implements Iterator { + protected final Iterator iterator; + + IteratorDecorator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + + static class DataSplitter implements Iterator { + private final String separator; + private final StringBuilder accumulator = new StringBuilder(); + private int currentIdx = 0; + private boolean dropNextPartialFragments = false; + private final int sizeLimit; + + /** + * @param separator + * is the token separator string. + * */ + DataSplitter(String separator) { + this.separator = separator; + this.sizeLimit = Integer.MIN_VALUE; + } + + /** + * @param separator + * is the token separator string. + * @param sizeLimit + * maximum token size length. + * */ + DataSplitter(String separator, int sizeLimit) { + this.separator = separator; + this.sizeLimit = sizeLimit; + } + + @Override + public boolean hasNext() { + int nextIdx = accumulator.indexOf(separator, currentIdx); + if (nextIdx == -1) { + // not found next separator + cleanupAccumulator(); + // if it has a remaining bigger than the admitted size, then it start drop other next fragments that + // doesn't contain any separator + if (sizeLimit != Integer.MIN_VALUE && accumulator.length() > sizeLimit) { + dropNextPartialFragments = true; + } + return false; + } else { + return true; + } + } + + @Override + public String next() { + int nextIdx = accumulator.indexOf(separator, currentIdx); + if (nextIdx == -1) { + // not found next separator + cleanupAccumulator(); + throw new NoSuchElementException(); + } else { + String token = accumulator.substring(currentIdx, nextIdx); + currentIdx = nextIdx + separator.length(); + if (sizeLimit != Integer.MIN_VALUE && token.length() > sizeLimit) { + throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + sizeLimit); + } + return token; + } + } + + private void cleanupAccumulator() { + accumulator.delete(0, currentIdx); + currentIdx = 0; + } + + public void append(String data) { + if (!data.contains(separator) && dropNextPartialFragments) { + return; + } + dropNextPartialFragments = false; + accumulator.append(data); + } + + public String flush() { + return accumulator.substring(currentIdx); + } + + @Override + public String toString() { + return "accumulator=" + accumulator + ", currentIdx=" + currentIdx; + } + } + + public BufferedTokenizer() { + this("\n"); + } + + public BufferedTokenizer(String separator) { + this.dataSplitter = new DataSplitter(separator); + this.iterable = () -> dataSplitter; + } + + public BufferedTokenizer(String separator, int sizeLimit) { + if (sizeLimit <= 0) { + throw new IllegalArgumentException("Size limit must be positive"); + } + + this.dataSplitter = new DataSplitter(separator, sizeLimit); + this.iterable = () -> dataSplitter; + } + + public Iterable extract(String data) { + dataSplitter.append(data); + + return iterable; + } + + public String flush() { + return dataSplitter.flush(); + } + + @Override + public String toString() { + return dataSplitter.toString(); + } + + public boolean isEmpty() { + return !dataSplitter.hasNext(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index e2c476520c1..2863c94069a 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -21,7 +21,6 @@ package org.logstash.common; import org.jruby.Ruby; -import org.jruby.RubyArray; import org.jruby.RubyClass; import org.jruby.RubyEncoding; import org.jruby.RubyObject; @@ -34,23 +33,15 @@ import org.logstash.RubyUtil; import java.nio.charset.Charset; +import java.util.Iterator; @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { private static final long serialVersionUID = 1L; - private static final RubyString NEW_LINE = (RubyString) RubyUtil.RUBY.newString("\n"). - freeze(RubyUtil.RUBY.getCurrentContext()); - - private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); - private StringBuilder headToken = new StringBuilder(); - private RubyString delimiter = NEW_LINE; - private int sizeLimit; - private boolean hasSizeLimit; - private int inputSize; - private boolean bufferFullErrorNotified = false; private String encodingName; + private transient BufferedTokenizer tokenizer; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -58,18 +49,17 @@ public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(name = "initialize", optional = 2) public IRubyObject init(final ThreadContext context, IRubyObject[] args) { + String delimiter = "\n"; if (args.length >= 1) { - this.delimiter = args[0].convertToString(); + delimiter = args[0].convertToString().asJavaString(); } if (args.length == 2) { final int sizeLimit = args[1].convertToInteger().getIntValue(); - if (sizeLimit <= 0) { - throw new IllegalArgumentException("Size limit must be positive"); - } - this.sizeLimit = sizeLimit; - this.hasSizeLimit = true; + this.tokenizer = new BufferedTokenizer(delimiter, sizeLimit); + } else { + this.tokenizer = new BufferedTokenizer(delimiter); } - this.inputSize = 0; + return this; } @@ -86,70 +76,39 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { */ @JRubyMethod @SuppressWarnings("rawtypes") - public RubyArray extract(final ThreadContext context, IRubyObject data) { + public IRubyObject extract(final ThreadContext context, IRubyObject data) { RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); encodingName = encoding.getEncoding().getCharsetName(); - final RubyArray entities = data.convertToString().split(delimiter, -1); - if (!bufferFullErrorNotified) { - input.clear(); - input.concat(entities); - } else { - // after a full buffer signal - if (input.isEmpty()) { - // after a buffer full error, the remaining part of the line, till next delimiter, - // has to be consumed, unless the input buffer doesn't still contain fragments of - // subsequent tokens. - entities.shift(context); - input.concat(entities); - } else { - // merge last of the input with first of incoming data segment - if (!entities.isEmpty()) { - RubyString last = ((RubyString) input.pop(context)); - RubyString nextFirst = ((RubyString) entities.shift(context)); - entities.unshift(last.concat(nextFirst)); - input.concat(entities); - } - } - } - if (hasSizeLimit) { - if (bufferFullErrorNotified) { - bufferFullErrorNotified = false; - if (input.isEmpty()) { - return RubyUtil.RUBY.newArray(); - } - } - final int entitiesSize = ((RubyString) input.first()).size(); - if (inputSize + entitiesSize > sizeLimit) { - bufferFullErrorNotified = true; - headToken = new StringBuilder(); - String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); - inputSize = 0; - input.shift(context); // consume the token fragment that generates the buffer full - throw new IllegalStateException(errorMessage); + Iterable extractor = tokenizer.extract(data.asJavaString()); + + // return an iterator that does the encoding conversion + Iterator rubyStringAdpaterIterator = new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { + @Override + public CharSequence next() { + return toEncodedRubyString(context, iterator.next()); } - this.inputSize = inputSize + entitiesSize; + }; + + return RubyUtil.toRubyObject(new IterableAdapterWithEmptyCheck(rubyStringAdpaterIterator)); + } + + // Iterator to Iterable adapter with addition of isEmpty method + public static class IterableAdapterWithEmptyCheck implements Iterable { + private final Iterator origIterator; + + public IterableAdapterWithEmptyCheck(Iterator origIterator) { + this.origIterator = origIterator; } - if (input.getLength() < 2) { - // this is a specialization case which avoid adding and removing from input accumulator - // when it contains just one element - headToken.append(input.shift(context)); // remove head - return RubyUtil.RUBY.newArray(); + @Override + public Iterator iterator() { + return origIterator; } - if (headToken.length() > 0) { - // if there is a pending token part, merge it with the first token segment present - // in the accumulator, and clean the pending token part. - headToken.append(input.shift(context)); // append buffer to first element and - // create new RubyString with the data specified encoding - RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); - input.unshift(encodedHeadToken); // reinsert it into the array - headToken = new StringBuilder(); + public boolean isEmpty() { + return origIterator.hasNext(); } - headToken.append(input.pop(context)); // put the leftovers in headToken for later - inputSize = headToken.length(); - return input; } private RubyString toEncodedRubyString(ThreadContext context, String input) { @@ -168,30 +127,25 @@ private RubyString toEncodedRubyString(ThreadContext context, String input) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); - headToken = new StringBuilder(); - inputSize = 0; + String s = tokenizer.flush(); // create new RubyString with the last data specified encoding, if exists - RubyString encodedHeadToken; if (encodingName != null) { - encodedHeadToken = toEncodedRubyString(context, buffer.toString()); + return toEncodedRubyString(context, s); } else { // When used with TCP input it could be that on socket connection the flush method // is invoked while no invocation of extract, leaving the encoding name unassigned. // In such case also the headToken must be empty - if (!buffer.toString().isEmpty()) { + if (!s.isEmpty()) { throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); } - encodedHeadToken = (RubyString) buffer; + return RubyUtil.toRubyObject(s); } - - return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(tokenizer.isEmpty()); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java index 524abb36ed5..c33042b4ce6 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -19,7 +19,6 @@ package org.logstash.common; -import org.jruby.RubyArray; import org.jruby.RubyEncoding; import org.jruby.RubyString; import org.jruby.runtime.ThreadContext; @@ -29,6 +28,7 @@ import org.logstash.RubyTestBase; import org.logstash.RubyUtil; +import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; @@ -49,44 +49,54 @@ public void setUp() { sut.init(context, args); } + private static List toList(Iterable it) { + List l = new ArrayList<>(); + it.forEach(e -> l.add(e.toString())); + return l; + } + + private static List toList(IRubyObject it) { + return toList(it.toJava(Iterable.class)); + } + @Test public void shouldTokenizeASingleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\n"))); assertEquals(List.of("foo"), tokens); } @Test public void shouldMergeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo"))); assertTrue(tokens.isEmpty()); - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("bar\n"))); assertEquals(List.of("foobar"), tokens); } @Test public void shouldTokenizeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"))); assertEquals(List.of("foo", "bar"), tokens); } @Test public void shouldIgnoreEmptyPayload() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString(""))); assertTrue(tokens.isEmpty()); - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"))); assertEquals(List.of("foo"), tokens); } @Test public void shouldTokenizeEmptyPayloadWithNewline() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("\n"))); assertEquals(List.of(""), tokens); - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"))); assertEquals(List.of("", "", ""), tokens); } @@ -94,10 +104,10 @@ public void shouldTokenizeEmptyPayloadWithNewline() { public void shouldNotChangeEncodingOfTokensAfterPartitioning() { RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + Iterable tokens = sut.extract(context, rubyInput).toJava(Iterable.class); // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); + RubyString firstToken = tokens.iterator().next(); assertEquals("£", firstToken.toString()); // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion @@ -112,13 +122,13 @@ public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtrac sut.extract(context, rubyInput); IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) .force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); - assertTrue(tokens.isEmpty()); + List tokensJava = toList(sut.extract(context, capitalAInLatin1)); + assertTrue(tokensJava.isEmpty()); - tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); + Iterable tokens = sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})).toJava(Iterable.class); // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); + RubyString firstToken = tokens.iterator().next(); assertEquals("£A", firstToken.toString()); // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion @@ -130,10 +140,10 @@ public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtrac public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + Iterable tokens = sut.extract(context, rubyInput).toJava(Iterable.class); // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); + RubyString firstToken = tokens.iterator().next(); assertEquals("£", firstToken.toString()); // flush and check that the remaining A is still encoded in ISO8859-1 diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java deleted file mode 100644 index 9a07242369d..00000000000 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.logstash.common; - -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; -import org.junit.Before; -import org.junit.Test; -import org.logstash.RubyTestBase; -import org.logstash.RubyUtil; - -import java.util.List; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.*; -import static org.logstash.RubyUtil.RUBY; - -@SuppressWarnings("unchecked") -public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { - - private BufferedTokenizerExt sut; - private ThreadContext context; - - @Before - public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; - sut.init(context, args); - } - - @Test - public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); - - assertEquals(List.of("foo", "bar"), tokens); - } - - @Test - public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { - Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); - }); - assertThat(thrownException.getMessage(), containsString("input buffer full")); - } - - @Test - public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { - Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); - }); - assertThat(thrownException.getMessage(), containsString("input buffer full")); - - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); - assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); - } - - @Test - public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { - sut.extract(context, RubyUtil.RUBY.newString("aaaa")); - - Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); - }); - assertThat(thrownException.getMessage(), containsString("input buffer full")); - - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); - assertEquals(List.of("bbbb"), tokens); - } - - @Test - public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { - sut.extract(context, RubyUtil.RUBY.newString("aaaa")); - - //first buffer full on 13 "a" letters - Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); - }); - assertThat(thrownException.getMessage(), containsString("input buffer full")); - - // second buffer full on 11 "b" letters - Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); - }); - assertThat(secondThrownException.getMessage(), containsString("input buffer full")); - - // now should resemble processing on c and d - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); - assertEquals(List.of("ccccc", "ddd"), tokens); - } -} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java new file mode 100644 index 00000000000..4c2045618e7 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public final class BufferedTokenizerTest { + + private BufferedTokenizer sut; + + static List toList(Iterable iter) { + List acc = new ArrayList<>(); + iter.forEach(acc::add); + return acc; + } + + @Before + public void setUp() { + sut = new BufferedTokenizer(); + } + + @Test + public void shouldTokenizeASingleToken() { + List tokens = toList(sut.extract("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + List tokens = toList(sut.extract("foo")); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + List tokens = toList(sut.extract("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + List tokens = toList(sut.extract("")); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + List tokens = toList(sut.extract("\n")); + assertEquals(List.of(""), tokens); + + tokens = toList(sut.extract("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java similarity index 56% rename from logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java rename to logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java index 19872e66c3c..2b4c38c7e43 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java @@ -19,48 +19,37 @@ package org.logstash.common; -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; import org.junit.Before; import org.junit.Test; -import org.logstash.RubyTestBase; -import org.logstash.RubyUtil; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.logstash.RubyUtil.RUBY; +import static org.logstash.common.BufferedTokenizerTest.toList; -@SuppressWarnings("unchecked") -public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { +public final class BufferedTokenizerWithDelimiterTest { - private BufferedTokenizerExt sut; - private ThreadContext context; + private BufferedTokenizer sut; @Before public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; - sut.init(context, args); + sut = new BufferedTokenizer("||"); } @Test public void shouldTokenizeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + List tokens = toList(sut.extract("foo||b|r||")); assertEquals(List.of("foo", "b|r"), tokens); } @Test public void shouldIgnoreEmptyPayload() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + List tokens = toList(sut.extract("")); assertTrue(tokens.isEmpty()); - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + tokens = toList(sut.extract("foo||bar")); assertEquals(List.of("foo"), tokens); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java new file mode 100644 index 00000000000..e24ca2077ca --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import static org.logstash.common.BufferedTokenizerTest.toList; + +public final class BufferedTokenizerWithSizeLimitTest { + + private BufferedTokenizer sut; + + @Before + public void setUp() { + sut = new BufferedTokenizer("\n", 10); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + List tokens = toList(sut.extract("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract("this_is_longer_than_10\nkaboom").forEach(s -> {}); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract("this_is_longer_than_10\nkaboom").forEach(s -> {}); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + List tokens = toList(sut.extract("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract("aaaa"); + + // it goes to 11 on a sizeLimit of 10, but doesn't trigger the exception till the next separator is reached + sut.extract("aaaaaaa").forEach(s -> {}); + + Iterable tokenIterable = sut.extract("aa\nbbbb\nccc"); + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + // now when querying and the next delimiter is present, the error is raised + tokenIterable.forEach(s -> {}); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // the iteration on token can proceed + List tokens = toList(tokenIterable); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract("aaaa"); + + // it goes to 11 on a sizeLimit of 10, but doesn't trigger the exception till the next separator is reached + sut.extract("aaaaaaa").forEach(s -> {}); + + Iterable tokenIterable = sut.extract("aa\nbbbbbbbbbbb\ncc"); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + tokenIterable.forEach(s -> {}); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + tokenIterable.forEach(s -> {}); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + List tokens = toList(sut.extract("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } + + @Test + public void givenFragmentThatHasTheSecondTokenOverrunsSizeLimitThenAnErrorIsThrown() { + Iterable tokensIterable = sut.extract("aaaa\nbbbbbbbbbbb\nccc\n"); + Iterator tokensIterator = tokensIterable.iterator(); + + // first token length = 4, it's ok + assertEquals("aaaa", tokensIterator.next()); + + // second token is an overrun, length = 11 + Exception exception = assertThrows(IllegalStateException.class, () -> { + tokensIterator.next(); + }); + assertThat(exception.getMessage(), containsString("input buffer full")); + + // third token resumes + assertEquals("ccc", tokensIterator.next()); + } + + @Test + public void givenSequenceOfFragmentsWithoutSeparatorThenDoesntGenerateOutOfMemory() { + final String neverEndingData = generate(8, "a"); + for (int i = 0; i < 10; i++) { + // iterator has to be engaged + boolean hasNext = sut.extract(neverEndingData).iterator().hasNext(); + assertFalse(hasNext); + } + + // with the second fragment passed to extract it overrun the sizeLimit, the tokenizer + // drop starting from the third fragment + assertThat("Accumulator include only a part of an exploding payload", sut.flush().length(), is(lessThan(neverEndingData.length() * 3))); + + Iterable tokensIterable = sut.extract("\nbbb\n"); + Iterator tokensIterator = tokensIterable.iterator(); + // send a token delimiter and check an error is raised + Exception exception = assertThrows(IllegalStateException.class, () -> { + tokensIterator.next(); + }); + assertThat(exception.getMessage(), containsString("input buffer full")); + } + + private static String generate(int length, String fillChar) { + return fillChar.repeat(length); + } +} \ No newline at end of file