diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/DeserializedBatch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/DeserializedBatch.java deleted file mode 100644 index 851c772cc55..00000000000 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/DeserializedBatch.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.logstash.ackedqueue; - -import java.util.List; - -public class DeserializedBatch { - private final List elements; - private final long firstSeqNum; - private final Queue queue; - - public DeserializedBatch(List elements, long firstSeqNum, Queue queue) { - this.elements = elements; - this.firstSeqNum = firstSeqNum; - this.queue = queue; - } - - public Batch deserialize() { - return new Batch(elements, firstSeqNum, queue); - } -} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 8c9173793b6..85dded608d4 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -590,18 +590,18 @@ public void ensurePersistedUpto(long seqNum) throws IOException{ * @throws IOException if an IO error occurs */ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { - DeserializedBatch deserializedBatch; + final SerializedBatchHolder serializedBatchHolder; lock.lock(); try { Page p = nextReadPage(); if (isHeadPage(p) && p.isFullyRead()) { return null; } - deserializedBatch = readPageBatch(p, limit, 0L); + serializedBatchHolder = readPageBatch(p, limit, 0L); } finally { lock.unlock(); } - return deserializedBatch.deserialize(); + return serializedBatchHolder.deserialize(); } /** @@ -616,7 +616,7 @@ public Batch readBatch(int limit, long timeout) throws IOException { return readDeserializedBatch(limit, timeout).deserialize(); } - public synchronized DeserializedBatch readDeserializedBatch(int limit, long timeout) throws IOException { + private synchronized SerializedBatchHolder readDeserializedBatch(int limit, long timeout) throws IOException { lock.lock(); try { @@ -635,7 +635,7 @@ public synchronized DeserializedBatch readDeserializedBatch(int limit, long time * @return {@link Batch} with read elements or null if nothing was read * @throws IOException if an IO error occurs */ - private DeserializedBatch readPageBatch(Page p, int limit, long timeout) throws IOException { + private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException { int left = limit; final List elements = new ArrayList<>(limit); @@ -687,7 +687,7 @@ private DeserializedBatch readPageBatch(Page p, int limit, long timeout) throws removeUnreadPage(p); } - return new DeserializedBatch(elements, firstSeqNum, this); + return new SerializedBatchHolder(elements, firstSeqNum); } /** @@ -903,4 +903,18 @@ private static boolean containsSeq(final Page page, final long seqNum) { final long pMaxSeq = pMinSeq + (long) page.getElementCount(); return seqNum >= pMinSeq && seqNum < pMaxSeq; } + + class SerializedBatchHolder { + private final List elements; + private final long firstSeqNum; + + private SerializedBatchHolder(List elements, long firstSeqNum) { + this.elements = elements; + this.firstSeqNum = firstSeqNum; + } + + private Batch deserialize() { + return new Batch(elements, firstSeqNum, Queue.this); + } + } }