From 0cfaf31e37af4d282be939715e65108f4687abbd Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 3 Mar 2025 09:37:17 +0100 Subject: [PATCH 01/16] First rough idea of reimplementation to return an iterator --- .../src/main/java/org/logstash/RubyUtil.java | 12 +- .../org/logstash/common/CustomTokenizer.java | 111 ++++++++++++++++++ 2 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 564f51bc085..c79708c8336 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -33,7 +33,7 @@ import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt; -import org.logstash.common.BufferedTokenizerExt; +//import org.logstash.common.BufferedTokenizerExt; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; @@ -152,7 +152,7 @@ public final class RubyUtil { public static final RubyClass OUTPUT_STRATEGY_SHARED; - public static final RubyClass BUFFERED_TOKENIZER; +// public static final RubyClass BUFFERED_TOKENIZER; public static final RubyClass ABSTRACT_METRIC_CLASS; @@ -356,10 +356,10 @@ public final class RubyUtil { OutputStrategyExt.OutputStrategyRegistryExt::new, OutputStrategyExt.OutputStrategyRegistryExt.class ); - BUFFERED_TOKENIZER = RUBY.getOrCreateModule("FileWatch").defineClassUnder( - "BufferedTokenizer", RUBY.getObject(), BufferedTokenizerExt::new - ); - BUFFERED_TOKENIZER.defineAnnotatedMethods(BufferedTokenizerExt.class); +// BUFFERED_TOKENIZER = RUBY.getOrCreateModule("FileWatch").defineClassUnder( +// "BufferedTokenizer", RUBY.getObject(), BufferedTokenizerExt::new +// ); +// BUFFERED_TOKENIZER.defineAnnotatedMethods(BufferedTokenizerExt.class); OUTPUT_DELEGATOR_STRATEGIES = RUBY.defineModuleUnder("OutputDelegatorStrategies", LOGSTASH_MODULE); OUTPUT_STRATEGY_ABSTRACT = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( diff --git a/logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java b/logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java new file mode 100644 index 00000000000..333b34ce861 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java @@ -0,0 +1,111 @@ +package org.logstash.common; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +public class CustomTokenizer { + + private final DataSplitter dataSplitter; + + static class ValueLimitIteratorDecorator implements Iterator { + private final Iterator iterator; + private final int limit = 10; + + ValueLimitIteratorDecorator(Iterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public String next() { + String value = iterator.next(); + if (value.length() > limit) { + throw new IllegalArgumentException("Too long"); + } + return value; + } + } + + static class DataSplitter implements Iterator { + private final String separator; + private int currentIdx = 0; + private final StringBuilder accumulator = new StringBuilder(); + + DataSplitter(String separator) { + this.separator = separator; + } + + @Override + public boolean hasNext() { + int nextIdx = accumulator.indexOf(separator, currentIdx); + if (nextIdx == -1) { + // not found next separator + System.out.println("hasNext return false because next token not found"); + cleanupAccumulator(); + return false; + } else { + return true; + } + } + + @Override + public String next() { + int nextIdx = accumulator.indexOf(separator, currentIdx); + if (nextIdx == -1) { + // not found next separator + cleanupAccumulator(); + throw new NoSuchElementException(); + } else { + String token = accumulator.substring(currentIdx, nextIdx); + currentIdx = nextIdx + separator.length(); + return token; + } + } + + private void cleanupAccumulator() { + accumulator.delete(0, currentIdx); + currentIdx = 0; + } + + public void append(String data) { + accumulator.append(data); + } + + public String flush() { + return accumulator.toString(); + } + + @Override + public String toString() { + return "accumulator=" + accumulator + ", currentIdx=" + currentIdx; + } + } + + public CustomTokenizer(String separator) { + this.dataSplitter = new DataSplitter(separator); + } + + public Iterable extract(String data) { + dataSplitter.append(data); + + return new Iterable() { + @Override + public Iterator iterator() { + return new ValueLimitIteratorDecorator(dataSplitter); + } + }; + } + + public String flush() { + return dataSplitter.flush(); + } + + @Override + public String toString() { + return dataSplitter.toString(); + } +} From 3eded20f405e6219080981b5b2e2fcd40c5de40f Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 3 Mar 2025 14:57:31 +0100 Subject: [PATCH 02/16] Exposed Java BufferedTokenizer under FileWatch module and adapted tests to use it --- .../benchmark/BufferedTokenizerBenchmark.java | 73 ++++++++ .../BufferedTokenizerExtBenchmark.java | 83 --------- logstash-core/lib/logstash/file_watch.rb | 3 + logstash-core/lib/logstash/util.rb | 1 + .../spec/logstash/util/buftok_spec.rb | 28 +-- .../src/main/java/org/logstash/RubyUtil.java | 1 + ...mTokenizer.java => BufferedTokenizer.java} | 35 +++- .../common/BufferedTokenizerExtTest.java | 161 ------------------ .../common/BufferedTokenizerTest.java | 155 +++++++++++++++++ ...> BufferedTokenizerWithDelimiterTest.java} | 25 +-- ...> BufferedTokenizerWithSizeLimitTest.java} | 44 ++--- 11 files changed, 303 insertions(+), 306 deletions(-) create mode 100644 logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java delete mode 100644 logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java create mode 100644 logstash-core/lib/logstash/file_watch.rb rename logstash-core/src/main/java/org/logstash/common/{CustomTokenizer.java => BufferedTokenizer.java} (71%) delete mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java rename logstash-core/src/test/java/org/logstash/common/{BufferedTokenizerExtWithDelimiterTest.java => BufferedTokenizerWithDelimiterTest.java} (56%) rename logstash-core/src/test/java/org/logstash/common/{BufferedTokenizerExtWithSizeLimitTest.java => BufferedTokenizerWithSizeLimitTest.java} (63%) diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java new file mode 100644 index 00000000000..5366e0fa04c --- /dev/null +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -0,0 +1,73 @@ +package org.logstash.benchmark; + +import org.logstash.common.BufferedTokenizer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.concurrent.TimeUnit; + + +@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Thread) +public class BufferedTokenizerBenchmark { + + private BufferedTokenizer sut; + private String singleTokenPerFragment; + private String multipleTokensPerFragment; + private String multipleTokensSpreadMultipleFragments_1; + private String multipleTokensSpreadMultipleFragments_2; + private String multipleTokensSpreadMultipleFragments_3; + + @Setup(Level.Invocation) + public void setUp() { + sut = new BufferedTokenizer(); + singleTokenPerFragment = "a".repeat(512) + "\n"; + + multipleTokensPerFragment = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n"; + + multipleTokensSpreadMultipleFragments_1 = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256); + multipleTokensSpreadMultipleFragments_2 = "c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256); + multipleTokensSpreadMultipleFragments_3 = "f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n"; + } + + @SuppressWarnings("unchecked") + @Benchmark + public final void onlyOneTokenPerFragment(Blackhole blackhole) { + Iterable tokens = sut.extract(singleTokenPerFragment); + blackhole.consume(tokens); + } + + @SuppressWarnings("unchecked") + @Benchmark + public final void multipleTokenPerFragment(Blackhole blackhole) { + Iterable tokens = sut.extract(multipleTokensPerFragment); + blackhole.consume(tokens); + } + + @SuppressWarnings("unchecked") + @Benchmark + public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { + Iterable tokens = sut.extract(multipleTokensSpreadMultipleFragments_1); + blackhole.consume(tokens); + + tokens = sut.extract(multipleTokensSpreadMultipleFragments_2); + blackhole.consume(tokens); + + tokens = sut.extract(multipleTokensSpreadMultipleFragments_3); + blackhole.consume(tokens); + } +} diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java deleted file mode 100644 index 5b01cebb3e5..00000000000 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.logstash.benchmark; - -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; -import org.logstash.RubyUtil; -import org.logstash.common.BufferedTokenizerExt; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Level; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; - -import java.util.concurrent.TimeUnit; - -import static org.logstash.RubyUtil.RUBY; - -@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS) -@Fork(1) -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@State(Scope.Thread) -public class BufferedTokenizerExtBenchmark { - - private BufferedTokenizerExt sut; - private ThreadContext context; - private RubyString singleTokenPerFragment; - private RubyString multipleTokensPerFragment; - private RubyString multipleTokensSpreadMultipleFragments_1; - private RubyString multipleTokensSpreadMultipleFragments_2; - private RubyString multipleTokensSpreadMultipleFragments_3; - - @Setup(Level.Invocation) - public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {}; - sut.init(context, args); - singleTokenPerFragment = RubyUtil.RUBY.newString("a".repeat(512) + "\n"); - - multipleTokensPerFragment = RubyUtil.RUBY.newString("a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n"); - - multipleTokensSpreadMultipleFragments_1 = RubyUtil.RUBY.newString("a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256)); - multipleTokensSpreadMultipleFragments_2 = RubyUtil.RUBY.newString("c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256)); - multipleTokensSpreadMultipleFragments_3 = RubyUtil.RUBY.newString("f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n"); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void onlyOneTokenPerFragment(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, singleTokenPerFragment); - blackhole.consume(tokens); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void multipleTokenPerFragment(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, multipleTokensPerFragment); - blackhole.consume(tokens); - } - - @SuppressWarnings("unchecked") - @Benchmark - public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { - RubyArray tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_1); - blackhole.consume(tokens); - - tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_2); - blackhole.consume(tokens); - - tokens = (RubyArray) sut.extract(context, multipleTokensSpreadMultipleFragments_3); - blackhole.consume(tokens); - } -} diff --git a/logstash-core/lib/logstash/file_watch.rb b/logstash-core/lib/logstash/file_watch.rb new file mode 100644 index 00000000000..f1a73f55926 --- /dev/null +++ b/logstash-core/lib/logstash/file_watch.rb @@ -0,0 +1,3 @@ +module FileWatch + java_import org.logstash.common.BufferedTokenizer +end \ No newline at end of file diff --git a/logstash-core/lib/logstash/util.rb b/logstash-core/lib/logstash/util.rb index 9eac95e1a27..5e1f760f018 100644 --- a/logstash-core/lib/logstash/util.rb +++ b/logstash-core/lib/logstash/util.rb @@ -16,6 +16,7 @@ # under the License. require "logstash/environment" +require "logstash/file_watch" module LogStash::Util UNAME = case RbConfig::CONFIG["host_os"] diff --git a/logstash-core/spec/logstash/util/buftok_spec.rb b/logstash-core/spec/logstash/util/buftok_spec.rb index 3d7b4b1990f..4eceb20014e 100644 --- a/logstash-core/spec/logstash/util/buftok_spec.rb +++ b/logstash-core/spec/logstash/util/buftok_spec.rb @@ -20,27 +20,33 @@ describe FileWatch::BufferedTokenizer do subject { FileWatch::BufferedTokenizer.new } + def to_list(iterator) + a = [] + iterator.each { |v| a << v } + return a + end + it "should tokenize a single token" do - expect(subject.extract("foo\n")).to eq(["foo"]) + expect(to_list(subject.extract("foo\n"))).to eq(["foo"]) end it "should merge multiple token" do - expect(subject.extract("foo")).to eq([]) - expect(subject.extract("bar\n")).to eq(["foobar"]) + expect(to_list(subject.extract("foo"))).to eq([]) + expect(to_list(subject.extract("bar\n"))).to eq(["foobar"]) end it "should tokenize multiple token" do - expect(subject.extract("foo\nbar\n")).to eq(["foo", "bar"]) + expect(to_list(subject.extract("foo\nbar\n"))).to eq(["foo", "bar"]) end it "should ignore empty payload" do - expect(subject.extract("")).to eq([]) - expect(subject.extract("foo\nbar")).to eq(["foo"]) + expect(to_list(subject.extract(""))).to eq([]) + expect(to_list(subject.extract("foo\nbar"))).to eq(["foo"]) end it "should tokenize empty payload with newline" do - expect(subject.extract("\n")).to eq([""]) - expect(subject.extract("\n\n\n")).to eq(["", "", ""]) + expect(to_list(subject.extract("\n"))).to eq([""]) + expect(to_list(subject.extract("\n\n\n"))).to eq(["", "", ""]) end describe 'flush' do @@ -83,12 +89,12 @@ let(:delimiter) { "||" } it "should tokenize multiple token" do - expect(subject.extract("foo||b|r||")).to eq(["foo", "b|r"]) + expect(to_list(subject.extract("foo||b|r||"))).to eq(["foo", "b|r"]) end it "should ignore empty payload" do - expect(subject.extract("")).to eq([]) - expect(subject.extract("foo||bar")).to eq(["foo"]) + expect(to_list(subject.extract(""))).to eq([]) + expect(to_list(subject.extract("foo||bar"))).to eq(["foo"]) end end end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index c79708c8336..c3446053c96 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -34,6 +34,7 @@ import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt; //import org.logstash.common.BufferedTokenizerExt; +import org.logstash.common.BufferedTokenizer; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; diff --git a/logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java similarity index 71% rename from logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java rename to logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 333b34ce861..1576283aa95 100644 --- a/logstash-core/src/main/java/org/logstash/common/CustomTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -3,16 +3,18 @@ import java.util.Iterator; import java.util.NoSuchElementException; -public class CustomTokenizer { +public class BufferedTokenizer { private final DataSplitter dataSplitter; + private Integer sizeLimit; static class ValueLimitIteratorDecorator implements Iterator { private final Iterator iterator; - private final int limit = 10; + private final int limit; - ValueLimitIteratorDecorator(Iterator iterator) { + ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { this.iterator = iterator; + this.limit = sizeLimit; } @Override @@ -24,7 +26,7 @@ public boolean hasNext() { public String next() { String value = iterator.next(); if (value.length() > limit) { - throw new IllegalArgumentException("Too long"); + throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + limit); } return value; } @@ -85,17 +87,34 @@ public String toString() { } } - public CustomTokenizer(String separator) { + public BufferedTokenizer() { + this("\n"); + } + + public BufferedTokenizer(String separator) { this.dataSplitter = new DataSplitter(separator); } + public BufferedTokenizer(String separator, int sizeLimit) { + if (sizeLimit <= 0) { + throw new IllegalArgumentException("Size limit must be positive"); + } + + this.dataSplitter = new DataSplitter(separator); + this.sizeLimit = sizeLimit; + } + public Iterable extract(String data) { dataSplitter.append(data); return new Iterable() { @Override public Iterator iterator() { - return new ValueLimitIteratorDecorator(dataSplitter); + Iterator returnedIterator = dataSplitter; + if (sizeLimit != null) { + returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); + } + return returnedIterator; } }; } @@ -108,4 +127,8 @@ public String flush() { public String toString() { return dataSplitter.toString(); } + + public boolean isEmpty() { + return !dataSplitter.hasNext(); + } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java deleted file mode 100644 index 524abb36ed5..00000000000 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.logstash.common; - -import org.jruby.RubyArray; -import org.jruby.RubyEncoding; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; -import org.junit.Before; -import org.junit.Test; -import org.logstash.RubyTestBase; -import org.logstash.RubyUtil; - -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.logstash.RubyUtil.RUBY; - -@SuppressWarnings("unchecked") -public final class BufferedTokenizerExtTest extends RubyTestBase { - - private BufferedTokenizerExt sut; - private ThreadContext context; - - @Before - public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {}; - sut.init(context, args); - } - - @Test - public void shouldTokenizeASingleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); - - assertEquals(List.of("foo"), tokens); - } - - @Test - public void shouldMergeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); - assertTrue(tokens.isEmpty()); - - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); - assertEquals(List.of("foobar"), tokens); - } - - @Test - public void shouldTokenizeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); - - assertEquals(List.of("foo", "bar"), tokens); - } - - @Test - public void shouldIgnoreEmptyPayload() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); - assertTrue(tokens.isEmpty()); - - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); - assertEquals(List.of("foo"), tokens); - } - - @Test - public void shouldTokenizeEmptyPayloadWithNewline() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); - assertEquals(List.of(""), tokens); - - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); - assertEquals(List.of("", "", ""), tokens); - } - - @Test - public void shouldNotChangeEncodingOfTokensAfterPartitioning() { - RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A - IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); - - // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); - assertEquals("£", firstToken.toString()); - - // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion - RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); - assertEquals("ISO-8859-1", encoding.toString()); - } - - @Test - public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { - RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character - IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - sut.extract(context, rubyInput); - IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) - .force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); - assertTrue(tokens.isEmpty()); - - tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); - - // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); - assertEquals("£A", firstToken.toString()); - - // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion - RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); - assertEquals("ISO-8859-1", encoding.toString()); - } - - @Test - public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { - RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A - IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); - - // read the first token, the £ string - IRubyObject firstToken = tokens.shift(context); - assertEquals("£", firstToken.toString()); - - // flush and check that the remaining A is still encoded in ISO8859-1 - IRubyObject lastToken = sut.flush(context); - assertEquals("A", lastToken.toString()); - - // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion - RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); - assertEquals("ISO-8859-1", encoding.toString()); - } - - @Test - public void givenDirectFlushInvocationUTF8EncodingIsApplied() { - RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A - IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); - - // flush and check that the remaining A is still encoded in ISO8859-1 - IRubyObject lastToken = sut.flush(context); - assertEquals("", lastToken.toString()); - - // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion - RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); - assertEquals("UTF-8", encoding.toString()); - } -} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java new file mode 100644 index 00000000000..43532f53946 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public final class BufferedTokenizerTest { + + private BufferedTokenizer sut; + + static List toList(Iterable iter) { + List acc = new ArrayList<>(); + iter.forEach(acc::add); + return acc; + } + + @Before + public void setUp() { + sut = new BufferedTokenizer(); + } + + @Test + public void shouldTokenizeASingleToken() { + List tokens = toList(sut.extract("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + List tokens = toList(sut.extract("foo")); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + List tokens = toList(sut.extract("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + List tokens = toList(sut.extract("")); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + List tokens = toList(sut.extract("\n")); + assertEquals(List.of(""), tokens); + + tokens = toList(sut.extract("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } + +// @Test +// public void shouldNotChangeEncodingOfTokensAfterPartitioning() { +// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A +// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); +// RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); +// +// // read the first token, the £ string +// IRubyObject firstToken = tokens.shift(context); +// assertEquals("£", firstToken.toString()); +// +// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion +// RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); +// assertEquals("ISO-8859-1", encoding.toString()); +// } +// +// @Test +// public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { +// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character +// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); +// sut.extract(context, rubyInput); +// IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) +// .force_encoding(context, RUBY.newString("ISO8859-1")); +// RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); +// assertTrue(tokens.isEmpty()); +// +// tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); +// +// // read the first token, the £ string +// IRubyObject firstToken = tokens.shift(context); +// assertEquals("£A", firstToken.toString()); +// +// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion +// RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); +// assertEquals("ISO-8859-1", encoding.toString()); +// } +// +// @Test +// public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { +// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A +// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); +// RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); +// +// // read the first token, the £ string +// IRubyObject firstToken = tokens.shift(context); +// assertEquals("£", firstToken.toString()); +// +// // flush and check that the remaining A is still encoded in ISO8859-1 +// IRubyObject lastToken = sut.flush(context); +// assertEquals("A", lastToken.toString()); +// +// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion +// RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); +// assertEquals("ISO-8859-1", encoding.toString()); +// } +// +// @Test +// public void givenDirectFlushInvocationUTF8EncodingIsApplied() { +// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A +// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); +// +// // flush and check that the remaining A is still encoded in ISO8859-1 +// IRubyObject lastToken = sut.flush(context); +// assertEquals("", lastToken.toString()); +// +// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion +// RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); +// assertEquals("UTF-8", encoding.toString()); +// } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java similarity index 56% rename from logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java rename to logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java index 19872e66c3c..2b4c38c7e43 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithDelimiterTest.java @@ -19,48 +19,37 @@ package org.logstash.common; -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; import org.junit.Before; import org.junit.Test; -import org.logstash.RubyTestBase; -import org.logstash.RubyUtil; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.logstash.RubyUtil.RUBY; +import static org.logstash.common.BufferedTokenizerTest.toList; -@SuppressWarnings("unchecked") -public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { +public final class BufferedTokenizerWithDelimiterTest { - private BufferedTokenizerExt sut; - private ThreadContext context; + private BufferedTokenizer sut; @Before public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; - sut.init(context, args); + sut = new BufferedTokenizer("||"); } @Test public void shouldTokenizeMultipleToken() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + List tokens = toList(sut.extract("foo||b|r||")); assertEquals(List.of("foo", "b|r"), tokens); } @Test public void shouldIgnoreEmptyPayload() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + List tokens = toList(sut.extract("")); assertTrue(tokens.isEmpty()); - tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + tokens = toList(sut.extract("foo||bar")); assertEquals(List.of("foo"), tokens); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java similarity index 63% rename from logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java rename to logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java index 9a07242369d..74be48c8be9 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java @@ -19,39 +19,29 @@ package org.logstash.common; -import org.jruby.RubyArray; -import org.jruby.RubyString; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; import org.junit.Before; import org.junit.Test; -import org.logstash.RubyTestBase; -import org.logstash.RubyUtil; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.*; -import static org.logstash.RubyUtil.RUBY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.logstash.common.BufferedTokenizerTest.toList; -@SuppressWarnings("unchecked") -public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { +public final class BufferedTokenizerWithSizeLimitTest { - private BufferedTokenizerExt sut; - private ThreadContext context; + private BufferedTokenizer sut; @Before public void setUp() { - sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); - context = RUBY.getCurrentContext(); - IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; - sut.init(context, args); + sut = new BufferedTokenizer("\n", 10); } @Test public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + List tokens = toList(sut.extract("foo\nbar\n")); assertEquals(List.of("foo", "bar"), tokens); } @@ -59,7 +49,7 @@ public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { @Test public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + sut.extract("this_is_longer_than_10\nkaboom").forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); } @@ -67,45 +57,45 @@ public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { @Test public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + sut.extract("this_is_longer_than_10\nkaboom").forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + List tokens = toList(sut.extract("\nanother")); assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); } @Test public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { - sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + sut.extract("aaaa"); Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + sut.extract("aaaaaaa").forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + List tokens = toList(sut.extract("aa\nbbbb\nccc")); assertEquals(List.of("bbbb"), tokens); } @Test public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { - sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + sut.extract("aaaa"); //first buffer full on 13 "a" letters Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + sut.extract("aaaaaaa").forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); // second buffer full on 11 "b" letters Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + sut.extract("aa\nbbbbbbbbbbb\ncc"); }); assertThat(secondThrownException.getMessage(), containsString("input buffer full")); // now should resemble processing on c and d - RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + List tokens = toList(sut.extract("ccc\nddd\n")); assertEquals(List.of("ccccc", "ddd"), tokens); } } \ No newline at end of file From 5a98b0b3e379fa3382bead78044bb54f77379cd3 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 4 Mar 2025 16:11:37 +0100 Subject: [PATCH 03/16] Moved string encoding logic to outer Ruby extension BufferedTokenizerExt class --- .../benchmark/BufferedTokenizerBenchmark.java | 3 - logstash-core/lib/logstash/file_watch.rb | 2 +- .../src/main/java/org/logstash/RubyUtil.java | 12 +- .../logstash/common/BufferedTokenizer.java | 18 +- .../logstash/common/BufferedTokenizerExt.java | 225 ++++++++++-------- .../common/BufferedTokenizerExtTest.java | 171 +++++++++++++ .../common/BufferedTokenizerTest.java | 71 +----- 7 files changed, 322 insertions(+), 180 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java index 5366e0fa04c..eb0c1df8ff6 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -44,21 +44,18 @@ public void setUp() { multipleTokensSpreadMultipleFragments_3 = "f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n"; } - @SuppressWarnings("unchecked") @Benchmark public final void onlyOneTokenPerFragment(Blackhole blackhole) { Iterable tokens = sut.extract(singleTokenPerFragment); blackhole.consume(tokens); } - @SuppressWarnings("unchecked") @Benchmark public final void multipleTokenPerFragment(Blackhole blackhole) { Iterable tokens = sut.extract(multipleTokensPerFragment); blackhole.consume(tokens); } - @SuppressWarnings("unchecked") @Benchmark public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { Iterable tokens = sut.extract(multipleTokensSpreadMultipleFragments_1); diff --git a/logstash-core/lib/logstash/file_watch.rb b/logstash-core/lib/logstash/file_watch.rb index f1a73f55926..a05a8549b5c 100644 --- a/logstash-core/lib/logstash/file_watch.rb +++ b/logstash-core/lib/logstash/file_watch.rb @@ -1,3 +1,3 @@ module FileWatch - java_import org.logstash.common.BufferedTokenizer + # java_import org.logstash.common.BufferedTokenizer end \ No newline at end of file diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index c3446053c96..50ee6d49442 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -33,7 +33,7 @@ import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; import org.logstash.common.AbstractDeadLetterQueueWriterExt; -//import org.logstash.common.BufferedTokenizerExt; +import org.logstash.common.BufferedTokenizerExt; import org.logstash.common.BufferedTokenizer; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; @@ -153,7 +153,7 @@ public final class RubyUtil { public static final RubyClass OUTPUT_STRATEGY_SHARED; -// public static final RubyClass BUFFERED_TOKENIZER; + public static final RubyClass BUFFERED_TOKENIZER; public static final RubyClass ABSTRACT_METRIC_CLASS; @@ -357,10 +357,10 @@ public final class RubyUtil { OutputStrategyExt.OutputStrategyRegistryExt::new, OutputStrategyExt.OutputStrategyRegistryExt.class ); -// BUFFERED_TOKENIZER = RUBY.getOrCreateModule("FileWatch").defineClassUnder( -// "BufferedTokenizer", RUBY.getObject(), BufferedTokenizerExt::new -// ); -// BUFFERED_TOKENIZER.defineAnnotatedMethods(BufferedTokenizerExt.class); + BUFFERED_TOKENIZER = RUBY.getOrCreateModule("FileWatch").defineClassUnder( + "BufferedTokenizer", RUBY.getObject(), BufferedTokenizerExt::new + ); + BUFFERED_TOKENIZER.defineAnnotatedMethods(BufferedTokenizerExt.class); OUTPUT_DELEGATOR_STRATEGIES = RUBY.defineModuleUnder("OutputDelegatorStrategies", LOGSTASH_MODULE); OUTPUT_STRATEGY_ABSTRACT = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder( diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 1576283aa95..f10650ea616 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -8,19 +8,26 @@ public class BufferedTokenizer { private final DataSplitter dataSplitter; private Integer sizeLimit; - static class ValueLimitIteratorDecorator implements Iterator { - private final Iterator iterator; - private final int limit; + static abstract class IteratorDecorator implements Iterator { + protected final Iterator iterator; - ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { + IteratorDecorator(Iterator iterator) { this.iterator = iterator; - this.limit = sizeLimit; } @Override public boolean hasNext() { return iterator.hasNext(); } + } + + static class ValueLimitIteratorDecorator extends IteratorDecorator { + private final int limit; + + ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { + super(iterator); + this.limit = sizeLimit; + } @Override public String next() { @@ -46,7 +53,6 @@ public boolean hasNext() { int nextIdx = accumulator.indexOf(separator, currentIdx); if (nextIdx == -1) { // not found next separator - System.out.println("hasNext return false because next token not found"); cleanupAccumulator(); return false; } else { diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index e2c476520c1..4833f353a06 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -34,23 +34,25 @@ import org.logstash.RubyUtil; import java.nio.charset.Charset; +import java.util.Iterator; @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { private static final long serialVersionUID = 1L; - private static final RubyString NEW_LINE = (RubyString) RubyUtil.RUBY.newString("\n"). - freeze(RubyUtil.RUBY.getCurrentContext()); +// private static final RubyString NEW_LINE = (RubyString) RubyUtil.RUBY.newString("\n"). +// freeze(RubyUtil.RUBY.getCurrentContext()); - private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); - private StringBuilder headToken = new StringBuilder(); - private RubyString delimiter = NEW_LINE; - private int sizeLimit; - private boolean hasSizeLimit; - private int inputSize; - private boolean bufferFullErrorNotified = false; +// private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); +// private StringBuilder headToken = new StringBuilder(); +// private RubyString delimiter = NEW_LINE; +// private int sizeLimit; +// private boolean hasSizeLimit; +// private int inputSize; +// private boolean bufferFullErrorNotified = false; private String encodingName; + private transient BufferedTokenizer tokenizer; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -58,18 +60,30 @@ public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(name = "initialize", optional = 2) public IRubyObject init(final ThreadContext context, IRubyObject[] args) { +// if (args.length >= 1) { +// this.delimiter = args[0].convertToString(); +// } +// if (args.length == 2) { +// final int sizeLimit = args[1].convertToInteger().getIntValue(); +// if (sizeLimit <= 0) { +// throw new IllegalArgumentException("Size limit must be positive"); +// } +// this.sizeLimit = sizeLimit; +// this.hasSizeLimit = true; +// } +// this.inputSize = 0; + + String delimiter = "\n"; if (args.length >= 1) { - this.delimiter = args[0].convertToString(); + delimiter = args[0].convertToString().asJavaString(); } if (args.length == 2) { final int sizeLimit = args[1].convertToInteger().getIntValue(); - if (sizeLimit <= 0) { - throw new IllegalArgumentException("Size limit must be positive"); - } - this.sizeLimit = sizeLimit; - this.hasSizeLimit = true; + this.tokenizer = new BufferedTokenizer(delimiter, sizeLimit); + } else { + this.tokenizer = new BufferedTokenizer(delimiter); } - this.inputSize = 0; + return this; } @@ -86,70 +100,88 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { */ @JRubyMethod @SuppressWarnings("rawtypes") - public RubyArray extract(final ThreadContext context, IRubyObject data) { + public IRubyObject extract(final ThreadContext context, IRubyObject data) { RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); encodingName = encoding.getEncoding().getCharsetName(); - final RubyArray entities = data.convertToString().split(delimiter, -1); - if (!bufferFullErrorNotified) { - input.clear(); - input.concat(entities); - } else { - // after a full buffer signal - if (input.isEmpty()) { - // after a buffer full error, the remaining part of the line, till next delimiter, - // has to be consumed, unless the input buffer doesn't still contain fragments of - // subsequent tokens. - entities.shift(context); - input.concat(entities); - } else { - // merge last of the input with first of incoming data segment - if (!entities.isEmpty()) { - RubyString last = ((RubyString) input.pop(context)); - RubyString nextFirst = ((RubyString) entities.shift(context)); - entities.unshift(last.concat(nextFirst)); - input.concat(entities); - } - } - } - if (hasSizeLimit) { - if (bufferFullErrorNotified) { - bufferFullErrorNotified = false; - if (input.isEmpty()) { - return RubyUtil.RUBY.newArray(); - } - } - final int entitiesSize = ((RubyString) input.first()).size(); - if (inputSize + entitiesSize > sizeLimit) { - bufferFullErrorNotified = true; - headToken = new StringBuilder(); - String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); - inputSize = 0; - input.shift(context); // consume the token fragment that generates the buffer full - throw new IllegalStateException(errorMessage); - } - this.inputSize = inputSize + entitiesSize; - } + Iterable extractor = tokenizer.extract(data.asJavaString()); - if (input.getLength() < 2) { - // this is a specialization case which avoid adding and removing from input accumulator - // when it contains just one element - headToken.append(input.shift(context)); // remove head - return RubyUtil.RUBY.newArray(); - } - if (headToken.length() > 0) { - // if there is a pending token part, merge it with the first token segment present - // in the accumulator, and clean the pending token part. - headToken.append(input.shift(context)); // append buffer to first element and - // create new RubyString with the data specified encoding - RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); - input.unshift(encodedHeadToken); // reinsert it into the array - headToken = new StringBuilder(); - } - headToken.append(input.pop(context)); // put the leftovers in headToken for later - inputSize = headToken.length(); - return input; + IRubyObject rubyIterable = RubyUtil.toRubyObject(new Iterable() { + @Override + public Iterator iterator() { + return new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { + @Override + public CharSequence next() { + return toEncodedRubyString(context, iterator.next()); + } + }; + } + }); + + return rubyIterable; + +// final RubyArray entities = data.convertToString().split(delimiter, -1); +// if (!bufferFullErrorNotified) { +// input.clear(); +// input.concat(entities); +// } else { +// // after a full buffer signal +// if (input.isEmpty()) { +// // after a buffer full error, the remaining part of the line, till next delimiter, +// // has to be consumed, unless the input buffer doesn't still contain fragments of +// // subsequent tokens. +// entities.shift(context); +// input.concat(entities); +// } else { +// // merge last of the input with first of incoming data segment +// if (!entities.isEmpty()) { +// RubyString last = ((RubyString) input.pop(context)); +// RubyString nextFirst = ((RubyString) entities.shift(context)); +// entities.unshift(last.concat(nextFirst)); +// input.concat(entities); +// } +// } +// } +// +// if (hasSizeLimit) { +// if (bufferFullErrorNotified) { +// bufferFullErrorNotified = false; +// if (input.isEmpty()) { +// return RubyUtil.RUBY.newArray(); +// } +// } +// final int entitiesSize = ((RubyString) input.first()).size(); +// if (inputSize + entitiesSize > sizeLimit) { +// bufferFullErrorNotified = true; +// headToken = new StringBuilder(); +// String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); +// inputSize = 0; +// input.shift(context); // consume the token fragment that generates the buffer full +// throw new IllegalStateException(errorMessage); +// } +// this.inputSize = inputSize + entitiesSize; +// } +// +// if (input.getLength() < 2) { +// // this is a specialization case which avoid adding and removing from input accumulator +// // when it contains just one element +// headToken.append(input.shift(context)); // remove head +// return RubyUtil.RUBY.newArray(); +// } +// +// if (headToken.length() > 0) { +// // if there is a pending token part, merge it with the first token segment present +// // in the accumulator, and clean the pending token part. +// headToken.append(input.shift(context)); // append buffer to first element and +// // create new RubyString with the data specified encoding +// RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); +// input.unshift(encodedHeadToken); // reinsert it into the array +// headToken = new StringBuilder(); +// } +// headToken.append(input.pop(context)); // put the leftovers in headToken for later +// inputSize = headToken.length(); +// return input; } private RubyString toEncodedRubyString(ThreadContext context, String input) { @@ -168,30 +200,33 @@ private RubyString toEncodedRubyString(ThreadContext context, String input) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); - headToken = new StringBuilder(); - inputSize = 0; - - // create new RubyString with the last data specified encoding, if exists - RubyString encodedHeadToken; - if (encodingName != null) { - encodedHeadToken = toEncodedRubyString(context, buffer.toString()); - } else { - // When used with TCP input it could be that on socket connection the flush method - // is invoked while no invocation of extract, leaving the encoding name unassigned. - // In such case also the headToken must be empty - if (!buffer.toString().isEmpty()) { - throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); - } - encodedHeadToken = (RubyString) buffer; - } - - return encodedHeadToken; + String s = tokenizer.flush(); + return RubyUtil.toRubyObject(s); +// final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); +// headToken = new StringBuilder(); +// inputSize = 0; +// +// // create new RubyString with the last data specified encoding, if exists +// RubyString encodedHeadToken; +// if (encodingName != null) { +// encodedHeadToken = toEncodedRubyString(context, buffer.toString()); +// } else { +// // When used with TCP input it could be that on socket connection the flush method +// // is invoked while no invocation of extract, leaving the encoding name unassigned. +// // In such case also the headToken must be empty +// if (!buffer.toString().isEmpty()) { +// throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); +// } +// encodedHeadToken = (RubyString) buffer; +// } +// +// return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); +// return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(tokenizer.isEmpty()); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..c33042b4ce6 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyEncoding; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + private static List toList(Iterable it) { + List l = new ArrayList<>(); + it.forEach(e -> l.add(e.toString())); + return l; + } + + private static List toList(IRubyObject it) { + return toList(it.toJava(Iterable.class)); + } + + @Test + public void shouldTokenizeASingleToken() { + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\n"))); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo"))); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("bar\n"))); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"))); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString(""))); + assertTrue(tokens.isEmpty()); + + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"))); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + List tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("\n"))); + assertEquals(List.of(""), tokens); + + tokens = toList(sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"))); + assertEquals(List.of("", "", ""), tokens); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioning() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + Iterable tokens = sut.extract(context, rubyInput).toJava(Iterable.class); + + // read the first token, the £ string + RubyString firstToken = tokens.iterator().next(); + assertEquals("£", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + sut.extract(context, rubyInput); + IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) + .force_encoding(context, RUBY.newString("ISO8859-1")); + List tokensJava = toList(sut.extract(context, capitalAInLatin1)); + assertTrue(tokensJava.isEmpty()); + + Iterable tokens = sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})).toJava(Iterable.class); + + // read the first token, the £ string + RubyString firstToken = tokens.iterator().next(); + assertEquals("£A", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + Iterable tokens = sut.extract(context, rubyInput).toJava(Iterable.class); + + // read the first token, the £ string + RubyString firstToken = tokens.iterator().next(); + assertEquals("£", firstToken.toString()); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("A", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void givenDirectFlushInvocationUTF8EncodingIsApplied() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("UTF-8", encoding.toString()); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java index 43532f53946..6499393b000 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java @@ -23,10 +23,12 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public final class BufferedTokenizerTest { @@ -83,73 +85,4 @@ public void shouldTokenizeEmptyPayloadWithNewline() { tokens = toList(sut.extract("\n\n\n")); assertEquals(List.of("", "", ""), tokens); } - -// @Test -// public void shouldNotChangeEncodingOfTokensAfterPartitioning() { -// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A -// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); -// RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); -// -// // read the first token, the £ string -// IRubyObject firstToken = tokens.shift(context); -// assertEquals("£", firstToken.toString()); -// -// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion -// RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); -// assertEquals("ISO-8859-1", encoding.toString()); -// } -// -// @Test -// public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { -// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character -// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); -// sut.extract(context, rubyInput); -// IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) -// .force_encoding(context, RUBY.newString("ISO8859-1")); -// RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); -// assertTrue(tokens.isEmpty()); -// -// tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); -// -// // read the first token, the £ string -// IRubyObject firstToken = tokens.shift(context); -// assertEquals("£A", firstToken.toString()); -// -// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion -// RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); -// assertEquals("ISO-8859-1", encoding.toString()); -// } -// -// @Test -// public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { -// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A -// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); -// RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); -// -// // read the first token, the £ string -// IRubyObject firstToken = tokens.shift(context); -// assertEquals("£", firstToken.toString()); -// -// // flush and check that the remaining A is still encoded in ISO8859-1 -// IRubyObject lastToken = sut.flush(context); -// assertEquals("A", lastToken.toString()); -// -// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion -// RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); -// assertEquals("ISO-8859-1", encoding.toString()); -// } -// -// @Test -// public void givenDirectFlushInvocationUTF8EncodingIsApplied() { -// RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A -// IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); -// -// // flush and check that the remaining A is still encoded in ISO8859-1 -// IRubyObject lastToken = sut.flush(context); -// assertEquals("", lastToken.toString()); -// -// // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion -// RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); -// assertEquals("UTF-8", encoding.toString()); -// } } \ No newline at end of file From ca2c4ddbfe4e2eebaa5aaa22ad5f2e5cd2f54734 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 4 Mar 2025 17:20:15 +0100 Subject: [PATCH 04/16] Fixed flush behavior --- .../org/logstash/common/BufferedTokenizer.java | 2 +- .../org/logstash/common/BufferedTokenizerExt.java | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index f10650ea616..0f43348dbd9 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -84,7 +84,7 @@ public void append(String data) { } public String flush() { - return accumulator.toString(); + return accumulator.substring(currentIdx); } @Override diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 4833f353a06..3363a81f329 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -201,7 +201,20 @@ private RubyString toEncodedRubyString(ThreadContext context, String input) { @JRubyMethod public IRubyObject flush(final ThreadContext context) { String s = tokenizer.flush(); - return RubyUtil.toRubyObject(s); + + // create new RubyString with the last data specified encoding, if exists + if (encodingName != null) { + return toEncodedRubyString(context, s); + } else { + // When used with TCP input it could be that on socket connection the flush method + // is invoked while no invocation of extract, leaving the encoding name unassigned. + // In such case also the headToken must be empty + if (!s.isEmpty()) { + throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); + } + return RubyUtil.toRubyObject(s); + } + // final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); // headToken = new StringBuilder(); // inputSize = 0; From dec76fe712201a071ee0c39c72a8704fa0a2de64 Mon Sep 17 00:00:00 2001 From: andsel Date: Tue, 4 Mar 2025 17:22:27 +0100 Subject: [PATCH 05/16] Minor, removed commented code --- .../logstash/common/BufferedTokenizerExt.java | 109 +----------------- 1 file changed, 1 insertion(+), 108 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 3363a81f329..a6456bf3452 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -21,7 +21,6 @@ package org.logstash.common; import org.jruby.Ruby; -import org.jruby.RubyArray; import org.jruby.RubyClass; import org.jruby.RubyEncoding; import org.jruby.RubyObject; @@ -41,16 +40,6 @@ public class BufferedTokenizerExt extends RubyObject { private static final long serialVersionUID = 1L; -// private static final RubyString NEW_LINE = (RubyString) RubyUtil.RUBY.newString("\n"). -// freeze(RubyUtil.RUBY.getCurrentContext()); - -// private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); -// private StringBuilder headToken = new StringBuilder(); -// private RubyString delimiter = NEW_LINE; -// private int sizeLimit; -// private boolean hasSizeLimit; -// private int inputSize; -// private boolean bufferFullErrorNotified = false; private String encodingName; private transient BufferedTokenizer tokenizer; @@ -60,19 +49,6 @@ public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { @JRubyMethod(name = "initialize", optional = 2) public IRubyObject init(final ThreadContext context, IRubyObject[] args) { -// if (args.length >= 1) { -// this.delimiter = args[0].convertToString(); -// } -// if (args.length == 2) { -// final int sizeLimit = args[1].convertToInteger().getIntValue(); -// if (sizeLimit <= 0) { -// throw new IllegalArgumentException("Size limit must be positive"); -// } -// this.sizeLimit = sizeLimit; -// this.hasSizeLimit = true; -// } -// this.inputSize = 0; - String delimiter = "\n"; if (args.length >= 1) { delimiter = args[0].convertToString().asJavaString(); @@ -106,7 +82,7 @@ public IRubyObject extract(final ThreadContext context, IRubyObject data) { Iterable extractor = tokenizer.extract(data.asJavaString()); - + // return an iterator that does the encoding conversion IRubyObject rubyIterable = RubyUtil.toRubyObject(new Iterable() { @Override public Iterator iterator() { @@ -120,68 +96,6 @@ public CharSequence next() { }); return rubyIterable; - -// final RubyArray entities = data.convertToString().split(delimiter, -1); -// if (!bufferFullErrorNotified) { -// input.clear(); -// input.concat(entities); -// } else { -// // after a full buffer signal -// if (input.isEmpty()) { -// // after a buffer full error, the remaining part of the line, till next delimiter, -// // has to be consumed, unless the input buffer doesn't still contain fragments of -// // subsequent tokens. -// entities.shift(context); -// input.concat(entities); -// } else { -// // merge last of the input with first of incoming data segment -// if (!entities.isEmpty()) { -// RubyString last = ((RubyString) input.pop(context)); -// RubyString nextFirst = ((RubyString) entities.shift(context)); -// entities.unshift(last.concat(nextFirst)); -// input.concat(entities); -// } -// } -// } -// -// if (hasSizeLimit) { -// if (bufferFullErrorNotified) { -// bufferFullErrorNotified = false; -// if (input.isEmpty()) { -// return RubyUtil.RUBY.newArray(); -// } -// } -// final int entitiesSize = ((RubyString) input.first()).size(); -// if (inputSize + entitiesSize > sizeLimit) { -// bufferFullErrorNotified = true; -// headToken = new StringBuilder(); -// String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); -// inputSize = 0; -// input.shift(context); // consume the token fragment that generates the buffer full -// throw new IllegalStateException(errorMessage); -// } -// this.inputSize = inputSize + entitiesSize; -// } -// -// if (input.getLength() < 2) { -// // this is a specialization case which avoid adding and removing from input accumulator -// // when it contains just one element -// headToken.append(input.shift(context)); // remove head -// return RubyUtil.RUBY.newArray(); -// } -// -// if (headToken.length() > 0) { -// // if there is a pending token part, merge it with the first token segment present -// // in the accumulator, and clean the pending token part. -// headToken.append(input.shift(context)); // append buffer to first element and -// // create new RubyString with the data specified encoding -// RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); -// input.unshift(encodedHeadToken); // reinsert it into the array -// headToken = new StringBuilder(); -// } -// headToken.append(input.pop(context)); // put the leftovers in headToken for later -// inputSize = headToken.length(); -// return input; } private RubyString toEncodedRubyString(ThreadContext context, String input) { @@ -214,31 +128,10 @@ public IRubyObject flush(final ThreadContext context) { } return RubyUtil.toRubyObject(s); } - -// final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); -// headToken = new StringBuilder(); -// inputSize = 0; -// -// // create new RubyString with the last data specified encoding, if exists -// RubyString encodedHeadToken; -// if (encodingName != null) { -// encodedHeadToken = toEncodedRubyString(context, buffer.toString()); -// } else { -// // When used with TCP input it could be that on socket connection the flush method -// // is invoked while no invocation of extract, leaving the encoding name unassigned. -// // In such case also the headToken must be empty -// if (!buffer.toString().isEmpty()) { -// throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); -// } -// encodedHeadToken = (RubyString) buffer; -// } -// -// return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { -// return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); return RubyUtil.RUBY.newBoolean(tokenizer.isEmpty()); } From e5c16860e51ce78d31c861a226bfee31283d16dd Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Mar 2025 10:00:35 +0100 Subject: [PATCH 06/16] Moved creation of iterable upfront in the constructor to be executed just one time --- .../main/java/org/logstash/common/BufferedTokenizer.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 0f43348dbd9..a8bd1f98f34 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -6,6 +6,7 @@ public class BufferedTokenizer { private final DataSplitter dataSplitter; + private final Iterable iterable; private Integer sizeLimit; static abstract class IteratorDecorator implements Iterator { @@ -99,6 +100,7 @@ public BufferedTokenizer() { public BufferedTokenizer(String separator) { this.dataSplitter = new DataSplitter(separator); + this.iterable = setupIterable(); } public BufferedTokenizer(String separator, int sizeLimit) { @@ -108,17 +110,22 @@ public BufferedTokenizer(String separator, int sizeLimit) { this.dataSplitter = new DataSplitter(separator); this.sizeLimit = sizeLimit; + this.iterable = setupIterable(); } public Iterable extract(String data) { dataSplitter.append(data); + return iterable; + } + + private Iterable setupIterable() { return new Iterable() { @Override public Iterator iterator() { Iterator returnedIterator = dataSplitter; if (sizeLimit != null) { - returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); + returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); } return returnedIterator; } From 12e7d68c3043fc9febe0f0144cf4a749a07ae58f Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Mar 2025 12:24:29 +0100 Subject: [PATCH 07/16] Fixed tests to grab the failure on exceeded size limit on itereation once reached the next separator --- .../BufferedTokenizerWithSizeLimitTest.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java index 74be48c8be9..7bb0f89a863 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java @@ -69,12 +69,18 @@ public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStarti public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { sut.extract("aaaa"); + // it goes to 11 on a sizeLimit of 10, but doesn't trigger the exception till the next separator is reached + sut.extract("aaaaaaa").forEach(s -> {}); + + Iterable tokenIterable = sut.extract("aa\nbbbb\nccc"); Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract("aaaaaaa").forEach(s -> {}); + // now when querying and the next delimiter is present, the error is raised + tokenIterable.forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); - List tokens = toList(sut.extract("aa\nbbbb\nccc")); + // the iteration on token can proceed + List tokens = toList(tokenIterable); assertEquals(List.of("bbbb"), tokens); } @@ -82,15 +88,20 @@ public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeed public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { sut.extract("aaaa"); + // it goes to 11 on a sizeLimit of 10, but doesn't trigger the exception till the next separator is reached + sut.extract("aaaaaaa").forEach(s -> {}); + + Iterable tokenIterable = sut.extract("aa\nbbbbbbbbbbb\ncc"); + //first buffer full on 13 "a" letters Exception thrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract("aaaaaaa").forEach(s -> {}); + tokenIterable.forEach(s -> {}); }); assertThat(thrownException.getMessage(), containsString("input buffer full")); // second buffer full on 11 "b" letters Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { - sut.extract("aa\nbbbbbbbbbbb\ncc"); + tokenIterable.forEach(s -> {}); }); assertThat(secondThrownException.getMessage(), containsString("input buffer full")); From 72c2d616590a32247e12a22a1c8cf8199fd973a4 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Mar 2025 12:46:46 +0100 Subject: [PATCH 08/16] Fixed license headers --- .../benchmark/BufferedTokenizerBenchmark.java | 19 ++++++++++++ logstash-core/lib/logstash/file_watch.rb | 3 -- logstash-core/lib/logstash/util.rb | 1 - .../logstash/common/BufferedTokenizer.java | 31 ++++++++++++++----- .../logstash/common/BufferedTokenizerExt.java | 13 ++------ .../common/BufferedTokenizerTest.java | 2 -- 6 files changed, 45 insertions(+), 24 deletions(-) delete mode 100644 logstash-core/lib/logstash/file_watch.rb diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java index eb0c1df8ff6..b94df65ecad 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -1,3 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.logstash.benchmark; import org.logstash.common.BufferedTokenizer; diff --git a/logstash-core/lib/logstash/file_watch.rb b/logstash-core/lib/logstash/file_watch.rb deleted file mode 100644 index a05a8549b5c..00000000000 --- a/logstash-core/lib/logstash/file_watch.rb +++ /dev/null @@ -1,3 +0,0 @@ -module FileWatch - # java_import org.logstash.common.BufferedTokenizer -end \ No newline at end of file diff --git a/logstash-core/lib/logstash/util.rb b/logstash-core/lib/logstash/util.rb index 5e1f760f018..9eac95e1a27 100644 --- a/logstash-core/lib/logstash/util.rb +++ b/logstash-core/lib/logstash/util.rb @@ -16,7 +16,6 @@ # under the License. require "logstash/environment" -require "logstash/file_watch" module LogStash::Util UNAME = case RbConfig::CONFIG["host_os"] diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index a8bd1f98f34..9b39d973404 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -1,3 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.logstash.common; import java.util.Iterator; @@ -120,15 +138,12 @@ public Iterable extract(String data) { } private Iterable setupIterable() { - return new Iterable() { - @Override - public Iterator iterator() { - Iterator returnedIterator = dataSplitter; - if (sizeLimit != null) { - returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); - } - return returnedIterator; + return () -> { + Iterator returnedIterator = dataSplitter; + if (sizeLimit != null) { + returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); } + return returnedIterator; }; } diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index a6456bf3452..9658ebe1bfd 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -83,19 +83,12 @@ public IRubyObject extract(final ThreadContext context, IRubyObject data) { Iterable extractor = tokenizer.extract(data.asJavaString()); // return an iterator that does the encoding conversion - IRubyObject rubyIterable = RubyUtil.toRubyObject(new Iterable() { + return RubyUtil.toRubyObject((Iterable) () -> new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { @Override - public Iterator iterator() { - return new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { - @Override - public CharSequence next() { - return toEncodedRubyString(context, iterator.next()); - } - }; + public CharSequence next() { + return toEncodedRubyString(context, iterator.next()); } }); - - return rubyIterable; } private RubyString toEncodedRubyString(ThreadContext context, String input) { diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java index 6499393b000..4c2045618e7 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerTest.java @@ -23,12 +23,10 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public final class BufferedTokenizerTest { From 7801b53e0eee500695ac242765806708657f8256 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Mar 2025 15:56:16 +0100 Subject: [PATCH 09/16] [Test] added test to verify buffer full error is notified not only on the first token --- .../BufferedTokenizerWithSizeLimitTest.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java index 7bb0f89a863..47866b61099 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java @@ -22,6 +22,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.Iterator; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; @@ -109,4 +110,22 @@ public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleT List tokens = toList(sut.extract("ccc\nddd\n")); assertEquals(List.of("ccccc", "ddd"), tokens); } + + @Test + public void givenFragmentThatHasTheSecondTokenOverrunsSizeLimitThenAnErrorIsThrown() { + Iterable tokensIterable = sut.extract("aaaa\nbbbbbbbbbbb\nccc\n"); + Iterator tokensIterator = tokensIterable.iterator(); + + // first token length = 4, it's ok + assertEquals("aaaa", tokensIterator.next()); + + // second token is an overrun, length = 11 + Exception exception = assertThrows(IllegalStateException.class, () -> { + tokensIterator.next(); + }); + assertThat(exception.getMessage(), containsString("input buffer full")); + + // third token resumes + assertEquals("ccc", tokensIterator.next()); + } } \ No newline at end of file From 99eca183792cb6e1e27edcae24e4c09ffc27665e Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Mar 2025 16:42:22 +0100 Subject: [PATCH 10/16] Updated benchmark to consume effectively the iterator --- .../org/logstash/benchmark/BufferedTokenizerBenchmark.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java index b94df65ecad..3512fdcc04f 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -37,7 +37,7 @@ @Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS) -@Measurement(iterations = 10, time = 100, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS) @Fork(1) @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.NANOSECONDS) @@ -66,24 +66,29 @@ public void setUp() { @Benchmark public final void onlyOneTokenPerFragment(Blackhole blackhole) { Iterable tokens = sut.extract(singleTokenPerFragment); + tokens.forEach(blackhole::consume); blackhole.consume(tokens); } @Benchmark public final void multipleTokenPerFragment(Blackhole blackhole) { Iterable tokens = sut.extract(multipleTokensPerFragment); + tokens.forEach(blackhole::consume); blackhole.consume(tokens); } @Benchmark public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) { Iterable tokens = sut.extract(multipleTokensSpreadMultipleFragments_1); + tokens.forEach(t -> {}); blackhole.consume(tokens); tokens = sut.extract(multipleTokensSpreadMultipleFragments_2); + tokens.forEach(t -> {}); blackhole.consume(tokens); tokens = sut.extract(multipleTokensSpreadMultipleFragments_3); + tokens.forEach(blackhole::consume); blackhole.consume(tokens); } } From cabd52c42771176e6634a80b7f9136f264c6bcc0 Mon Sep 17 00:00:00 2001 From: andsel Date: Thu, 6 Mar 2025 11:41:43 +0100 Subject: [PATCH 11/16] [Benchmark] JMH report in milliseconds instead of nanoseconds --- .../java/org/logstash/benchmark/BufferedTokenizerBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java index 3512fdcc04f..7eb6ea51055 100644 --- a/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java +++ b/logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerBenchmark.java @@ -40,7 +40,7 @@ @Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS) @Fork(1) @BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.NANOSECONDS) +@OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Thread) public class BufferedTokenizerBenchmark { From ef6ca8ee4dba523ed9715dd33a6a29b9de90f936 Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 12 Mar 2025 14:42:29 +0100 Subject: [PATCH 12/16] Added an isEmpty method to the iterable returned by BufferedTokenizerExt because it's expected in some use cases, like: https://github.com/logstash-plugins/logstash-input-file/blob/55a4a7099f05f29351672417036c1342850c7adc/lib/filewatch/watched_file.rb#L250 --- .../logstash/common/BufferedTokenizerExt.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index 9658ebe1bfd..2863c94069a 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -83,12 +83,32 @@ public IRubyObject extract(final ThreadContext context, IRubyObject data) { Iterable extractor = tokenizer.extract(data.asJavaString()); // return an iterator that does the encoding conversion - return RubyUtil.toRubyObject((Iterable) () -> new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { + Iterator rubyStringAdpaterIterator = new BufferedTokenizer.IteratorDecorator<>(extractor.iterator()) { @Override public CharSequence next() { return toEncodedRubyString(context, iterator.next()); } - }); + }; + + return RubyUtil.toRubyObject(new IterableAdapterWithEmptyCheck(rubyStringAdpaterIterator)); + } + + // Iterator to Iterable adapter with addition of isEmpty method + public static class IterableAdapterWithEmptyCheck implements Iterable { + private final Iterator origIterator; + + public IterableAdapterWithEmptyCheck(Iterator origIterator) { + this.origIterator = origIterator; + } + + @Override + public Iterator iterator() { + return origIterator; + } + + public boolean isEmpty() { + return origIterator.hasNext(); + } } private RubyString toEncodedRubyString(ThreadContext context, String input) { From 9a37f5617a0c992a3d1d6a9d3af03f9afbccfba4 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 10 Mar 2025 12:10:47 +0100 Subject: [PATCH 13/16] [Test] Added test to verify the avoidance of infnite accumulation --- .../BufferedTokenizerWithSizeLimitTest.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java index 47866b61099..e24ca2077ca 100644 --- a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerWithSizeLimitTest.java @@ -26,9 +26,8 @@ import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsString; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; import static org.logstash.common.BufferedTokenizerTest.toList; public final class BufferedTokenizerWithSizeLimitTest { @@ -128,4 +127,30 @@ public void givenFragmentThatHasTheSecondTokenOverrunsSizeLimitThenAnErrorIsThro // third token resumes assertEquals("ccc", tokensIterator.next()); } + + @Test + public void givenSequenceOfFragmentsWithoutSeparatorThenDoesntGenerateOutOfMemory() { + final String neverEndingData = generate(8, "a"); + for (int i = 0; i < 10; i++) { + // iterator has to be engaged + boolean hasNext = sut.extract(neverEndingData).iterator().hasNext(); + assertFalse(hasNext); + } + + // with the second fragment passed to extract it overrun the sizeLimit, the tokenizer + // drop starting from the third fragment + assertThat("Accumulator include only a part of an exploding payload", sut.flush().length(), is(lessThan(neverEndingData.length() * 3))); + + Iterable tokensIterable = sut.extract("\nbbb\n"); + Iterator tokensIterator = tokensIterable.iterator(); + // send a token delimiter and check an error is raised + Exception exception = assertThrows(IllegalStateException.class, () -> { + tokensIterator.next(); + }); + assertThat(exception.getMessage(), containsString("input buffer full")); + } + + private static String generate(int length, String fillChar) { + return fillChar.repeat(length); + } } \ No newline at end of file From 040db570e382f57c3cb3c5013f526685a19ab64e Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 10 Mar 2025 14:25:24 +0100 Subject: [PATCH 14/16] Changed behavior to the token accumulator so that start dropping fragments beloging to an already incomplete overrun token till next separator is found --- .../logstash/common/BufferedTokenizer.java | 92 ++++++++++++------- 1 file changed, 61 insertions(+), 31 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 9b39d973404..4e4b1c14cc0 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -20,12 +20,14 @@ import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.function.BiFunction; +import java.util.function.IntPredicate; public class BufferedTokenizer { private final DataSplitter dataSplitter; private final Iterable iterable; - private Integer sizeLimit; +// private Integer sizeLimit; static abstract class IteratorDecorator implements Iterator { protected final Iterator iterator; @@ -40,31 +42,45 @@ public boolean hasNext() { } } - static class ValueLimitIteratorDecorator extends IteratorDecorator { - private final int limit; - - ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { - super(iterator); - this.limit = sizeLimit; - } - - @Override - public String next() { - String value = iterator.next(); - if (value.length() > limit) { - throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + limit); - } - return value; - } - } +// static class ValueLimitIteratorDecorator extends IteratorDecorator { +// private final int limit; +// +// ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { +// super(iterator); +// this.limit = sizeLimit; +// } +// +// @Override +// public String next() { +// String value = iterator.next(); +// if (value.length() > limit) { +// throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + limit); +// } +// return value; +// } +// } static class DataSplitter implements Iterator { private final String separator; + private final IntPredicate sizeChecker; private int currentIdx = 0; private final StringBuilder accumulator = new StringBuilder(); + private boolean dropNextPartialFragments = false; DataSplitter(String separator) { this.separator = separator; + this.sizeChecker = value -> false; + } + + /** + * @param separator + * is the token separator string. + * @param sizeChecker + * function that verifies if token size is bigger then a limit + * */ + DataSplitter(String separator, IntPredicate sizeChecker) { + this.separator = separator; + this.sizeChecker = sizeChecker; } @Override @@ -73,6 +89,11 @@ public boolean hasNext() { if (nextIdx == -1) { // not found next separator cleanupAccumulator(); + // if it has a remaining bigger than the admitted size, then it start drop other next fragments that + // doesn't contain any separator + if (sizeChecker.test(accumulator.length())) { + dropNextPartialFragments = true; + } return false; } else { return true; @@ -89,6 +110,9 @@ public String next() { } else { String token = accumulator.substring(currentIdx, nextIdx); currentIdx = nextIdx + separator.length(); + if (sizeChecker.test(token.length())) { + throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit "); // TODO + sizeLimit + } return token; } } @@ -99,6 +123,10 @@ private void cleanupAccumulator() { } public void append(String data) { + if (!data.contains(separator) && dropNextPartialFragments) { + return; + } + dropNextPartialFragments = false; accumulator.append(data); } @@ -118,7 +146,8 @@ public BufferedTokenizer() { public BufferedTokenizer(String separator) { this.dataSplitter = new DataSplitter(separator); - this.iterable = setupIterable(); + this.iterable = () -> dataSplitter; +// this.iterable = setupIterable(); } public BufferedTokenizer(String separator, int sizeLimit) { @@ -126,9 +155,10 @@ public BufferedTokenizer(String separator, int sizeLimit) { throw new IllegalArgumentException("Size limit must be positive"); } - this.dataSplitter = new DataSplitter(separator); - this.sizeLimit = sizeLimit; - this.iterable = setupIterable(); + this.dataSplitter = new DataSplitter(separator, tokenSize -> tokenSize > sizeLimit); +// this.sizeLimit = sizeLimit; + this.iterable = () -> dataSplitter; +// this.iterable = setupIterable(); } public Iterable extract(String data) { @@ -137,15 +167,15 @@ public Iterable extract(String data) { return iterable; } - private Iterable setupIterable() { - return () -> { - Iterator returnedIterator = dataSplitter; - if (sizeLimit != null) { - returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); - } - return returnedIterator; - }; - } +// private Iterable setupIterable() { +// return () -> { +// Iterator returnedIterator = dataSplitter; +// if (sizeLimit != null) { +// returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); +// } +// return returnedIterator; +// }; +// } public String flush() { return dataSplitter.flush(); From 325ce9a62d916ec3e9ca11027c2c5e09c236f487 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 10 Mar 2025 14:27:42 +0100 Subject: [PATCH 15/16] Commented code cleanup --- .../logstash/common/BufferedTokenizer.java | 35 +------------------ 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 4e4b1c14cc0..595b3b1e4a7 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -20,14 +20,12 @@ import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.function.BiFunction; import java.util.function.IntPredicate; public class BufferedTokenizer { private final DataSplitter dataSplitter; private final Iterable iterable; -// private Integer sizeLimit; static abstract class IteratorDecorator implements Iterator { protected final Iterator iterator; @@ -42,29 +40,11 @@ public boolean hasNext() { } } -// static class ValueLimitIteratorDecorator extends IteratorDecorator { -// private final int limit; -// -// ValueLimitIteratorDecorator(Iterator iterator, int sizeLimit) { -// super(iterator); -// this.limit = sizeLimit; -// } -// -// @Override -// public String next() { -// String value = iterator.next(); -// if (value.length() > limit) { -// throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + limit); -// } -// return value; -// } -// } - static class DataSplitter implements Iterator { private final String separator; private final IntPredicate sizeChecker; - private int currentIdx = 0; private final StringBuilder accumulator = new StringBuilder(); + private int currentIdx = 0; private boolean dropNextPartialFragments = false; DataSplitter(String separator) { @@ -147,7 +127,6 @@ public BufferedTokenizer() { public BufferedTokenizer(String separator) { this.dataSplitter = new DataSplitter(separator); this.iterable = () -> dataSplitter; -// this.iterable = setupIterable(); } public BufferedTokenizer(String separator, int sizeLimit) { @@ -156,9 +135,7 @@ public BufferedTokenizer(String separator, int sizeLimit) { } this.dataSplitter = new DataSplitter(separator, tokenSize -> tokenSize > sizeLimit); -// this.sizeLimit = sizeLimit; this.iterable = () -> dataSplitter; -// this.iterable = setupIterable(); } public Iterable extract(String data) { @@ -167,16 +144,6 @@ public Iterable extract(String data) { return iterable; } -// private Iterable setupIterable() { -// return () -> { -// Iterator returnedIterator = dataSplitter; -// if (sizeLimit != null) { -// returnedIterator = new ValueLimitIteratorDecorator(returnedIterator, sizeLimit); -// } -// return returnedIterator; -// }; -// } - public String flush() { return dataSplitter.flush(); } From 71daf70a05e1e1ddaf13a90d4ef7898f75c923ec Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 10 Mar 2025 14:59:44 +0100 Subject: [PATCH 16/16] Inlined sizeLimit test without resorting to support predicate instance --- .../logstash/common/BufferedTokenizer.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java index 595b3b1e4a7..04cf4fad220 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizer.java @@ -20,7 +20,6 @@ import java.util.Iterator; import java.util.NoSuchElementException; -import java.util.function.IntPredicate; public class BufferedTokenizer { @@ -42,25 +41,29 @@ public boolean hasNext() { static class DataSplitter implements Iterator { private final String separator; - private final IntPredicate sizeChecker; private final StringBuilder accumulator = new StringBuilder(); private int currentIdx = 0; private boolean dropNextPartialFragments = false; + private final int sizeLimit; + /** + * @param separator + * is the token separator string. + * */ DataSplitter(String separator) { this.separator = separator; - this.sizeChecker = value -> false; + this.sizeLimit = Integer.MIN_VALUE; } /** * @param separator * is the token separator string. - * @param sizeChecker - * function that verifies if token size is bigger then a limit + * @param sizeLimit + * maximum token size length. * */ - DataSplitter(String separator, IntPredicate sizeChecker) { + DataSplitter(String separator, int sizeLimit) { this.separator = separator; - this.sizeChecker = sizeChecker; + this.sizeLimit = sizeLimit; } @Override @@ -71,7 +74,7 @@ public boolean hasNext() { cleanupAccumulator(); // if it has a remaining bigger than the admitted size, then it start drop other next fragments that // doesn't contain any separator - if (sizeChecker.test(accumulator.length())) { + if (sizeLimit != Integer.MIN_VALUE && accumulator.length() > sizeLimit) { dropNextPartialFragments = true; } return false; @@ -90,8 +93,8 @@ public String next() { } else { String token = accumulator.substring(currentIdx, nextIdx); currentIdx = nextIdx + separator.length(); - if (sizeChecker.test(token.length())) { - throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit "); // TODO + sizeLimit + if (sizeLimit != Integer.MIN_VALUE && token.length() > sizeLimit) { + throw new IllegalStateException("input buffer full, consumed token which exceeded the sizeLimit " + sizeLimit); } return token; } @@ -134,7 +137,7 @@ public BufferedTokenizer(String separator, int sizeLimit) { throw new IllegalArgumentException("Size limit must be positive"); } - this.dataSplitter = new DataSplitter(separator, tokenSize -> tokenSize > sizeLimit); + this.dataSplitter = new DataSplitter(separator, sizeLimit); this.iterable = () -> dataSplitter; }