Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,8 @@ public class Batch implements Closeable {
private final Queue queue;
private final AtomicBoolean closed;

public Batch(SequencedList<byte[]> serialized, Queue q) {
this(
serialized.getElements(),
serialized.getSeqNums().size() == 0 ? -1L : serialized.getSeqNums().get(0), q
);
}

public Batch(List<byte[]> elements, long firstSeqNum, Queue q) {
this.elements = deserializeElements(elements, q);
Batch(List<Queueable> elements, long firstSeqNum, Queue q) {
this.elements = elements;
this.firstSeqNum = elements.isEmpty() ? -1L : firstSeqNum;
this.queue = q;
this.closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -90,4 +83,10 @@ private static List<Queueable> deserializeElements(List<byte[]> serialized, Queu
}
return deserialized;
}

private static long minSeqNum(SequencedList<?> sequencedList) {
return sequencedList.getMinSeqNum();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.logstash.ackedqueue;

import java.util.function.Function;

/**
* In its simplest form a {@code BoxedQueueable} is a box around a {@link Queueable},
* which can be useful for passing a mixture of live references and to-be-deserialized
* byte arrays. it is an internal implementation detail of the acked queue.
*/
interface BoxedQueueable {
Queueable unbox();

static BoxedQueueable fromLiveReference(final Queueable queueable) {
return new LiveReference(queueable);
}

static BoxedQueueable fromSerializedBytes(final byte[] bytes, Function<byte[], Queueable> deserializer) {
return new SerializedBytes(bytes, deserializer);
}

/**
* A {@code BoxedQueueable.LiveReference} is an implementation of {@link BoxedQueueable} that
* wraps a live object
*/
class LiveReference implements BoxedQueueable {
private final Queueable boxed;

public LiveReference(Queueable boxed) {
this.boxed = boxed;
}

@Override
public Queueable unbox() {
return this.boxed;
}
}

/**
* A {@code BoxedQueueable.SerializedBytes} is an implementation of {@link BoxedQueueable} that
* wraps bytes and a deserializer
*/
class SerializedBytes implements BoxedQueueable {
private final byte[] bytes;
private final Function<byte[], Queueable> deserializer;

public SerializedBytes(byte[] bytes, Function<byte[], Queueable> deserializer) {
this.bytes = bytes;
this.deserializer = deserializer;
}

@Override
public Queueable unbox() {
return deserializer.apply(bytes);
}
}
}
117 changes: 109 additions & 8 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Page.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.codehaus.commons.nullanalysis.NotNull;
import org.logstash.ackedqueue.io.CheckpointIO;
import org.logstash.ackedqueue.io.PageIO;
Expand All @@ -41,6 +46,8 @@ public final class Page implements Closeable {
protected PageIO pageIO;
private boolean writable;

private final ElementCache<Queueable> elementCache;

// bit 0 is minSeqNum
// TODO: go steal LocalCheckpointService in feature/seq_no from ES
// TODO: https://github.com/elastic/elasticsearch/blob/feature/seq_no/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java
Expand All @@ -59,6 +66,8 @@ public Page(int pageNum, Queue queue, long minSeqNum, int elementCount, long fir
this.pageIO = pageIO;
this.writable = writable;

this.elementCache = writable ? new ElementCache<>() : null;

assert this.pageIO != null : "invalid null pageIO";
}

Expand All @@ -71,30 +80,55 @@ public String toString() {
* @return {@link SequencedList} collection of serialized elements read
* @throws IOException if an IO error occurs
*/
public SequencedList<byte[]> read(int limit) throws IOException {
SequencedList<BoxedQueueable> read(int limit) throws IOException {
// first make sure this page is activated, activating previously activated is harmless
this.pageIO.activate();

SequencedList<byte[]> serialized = this.pageIO.read(this.firstUnreadSeqNum, limit);
assert serialized.getSeqNums().get(0) == this.firstUnreadSeqNum :
String.format("firstUnreadSeqNum=%d != first result seqNum=%d", this.firstUnreadSeqNum, serialized.getSeqNums().get(0));
// rescope limit
limit = Math.max(0, Math.min(limit, elementCount - Math.toIntExact(firstUnreadSeqNum-minSeqNum)));

final ArrayList<BoxedQueueable> elements = new ArrayList<>(limit);

int pageOffset = Math.toIntExact(this.firstUnreadSeqNum - this.minSeqNum);

// first attempt to read from the event cache; as soon as we are unable
// to read events directly from the cache, fall through to the pageIO
if (this.elementCache != null) {
for (Queueable cachedElement : elementCache.releaseMany(this.firstUnreadSeqNum, limit)) {
elements.add(BoxedQueueable.fromLiveReference(cachedElement));
}
}

int limitRemaining = limit - elements.size();
if (limitRemaining > 0 && pageOffset < this.elementCount) {
SequencedList<byte[]> serialized = this.pageIO.read(this.firstUnreadSeqNum, limitRemaining);
assert serialized.getMinSeqNum() == this.firstUnreadSeqNum + elements.size() :
String.format("firstUnreadSeqNum=%d != first result seqNum=%d + cached=%d", this.firstUnreadSeqNum, serialized.getMinSeqNum(), elements.size());
for (SequencedList.Entry<byte[]> entry : serialized.entries()) {
elements.add(BoxedQueueable.fromSerializedBytes(entry.element, this.queue::deserialize));
}
}

final SequencedList<BoxedQueueable> result = new SequencedList<>(elements, this.firstUnreadSeqNum);

this.firstUnreadSeqNum += serialized.getElements().size();
this.firstUnreadSeqNum += elements.size();

return serialized;
return result;
}

public void write(byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOException {
public void write(Queueable element, byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOException {
if (! this.writable) {
throw new IllegalStateException(String.format("page=%d is not writable", this.pageNum));
}

this.pageIO.write(bytes, seqNum);
this.elementCache.stash(element, seqNum);

if (this.minSeqNum <= 0) {
this.minSeqNum = seqNum;
this.firstUnreadSeqNum = seqNum;
}

this.elementCount++;

// force a checkpoint if we wrote checkpointMaxWrites elements since last checkpoint
Expand Down Expand Up @@ -154,7 +188,7 @@ public boolean ack(long firstSeqNum, int count, int checkpointMaxAcks) throws IO
this.minSeqNum, this.elementCount, this.minSeqNum + this.elementCount
);
final int offset = Ints.checkedCast(firstSeqNum - this.minSeqNum);
ackedSeqNums.flip(offset, offset + count);
ackedSeqNums.set(offset, offset + count);
// checkpoint if totally acked or we acked more than checkpointMaxAcks elements in this page since last checkpoint
// note that fully acked pages cleanup is done at queue level in Queue.ack()
final long firstUnackedSeqNum = firstUnackedSeqNum();
Expand Down Expand Up @@ -250,6 +284,9 @@ public void behead() throws IOException {
*/
public void deactivate() throws IOException {
this.getPageIO().deactivate();
if (this.elementCache != null) {
this.elementCache.releaseAll();
}
}

public boolean hasSpace(int byteSize) {
Expand Down Expand Up @@ -300,4 +337,68 @@ protected long firstUnackedSeqNum() {
return this.ackedSeqNums.nextClearBit(0) + this.minSeqNum;
}

static class ReleaseableSoftReference<T extends Queueable> extends SoftReference<T> {
public ReleaseableSoftReference(T referent) {
super(referent);
}
public T getAndReleaseElement() {
T element = super.get();
super.clear();
return element;
}
}

static class ElementCache<T extends Queueable> {
private long minSeqNum = Long.MIN_VALUE;
private int elementCount = 0;
private ArrayList<ReleaseableSoftReference<T>> elementCache;

private static final AtomicLong ID_GENERATOR = new AtomicLong();
private final long id = ID_GENERATOR.incrementAndGet();

public void stash(T element, long seqNum) {
final int offset;
if (minSeqNum < 0) {
minSeqNum = seqNum;
elementCache = new ArrayList<>();
offset = 0;
} else {
offset = Math.toIntExact(seqNum - minSeqNum);
}

elementCache.add(offset, new ReleaseableSoftReference<>(element));
elementCount++;
}

public T release(long seqNum) {
if (elementCount <= 0 || seqNum < minSeqNum) {
return null;
}
final int offset = Math.toIntExact(seqNum - minSeqNum);
if (offset >= elementCount) {
return null;
}
final ReleaseableSoftReference<T> releaseableSoftReference = elementCache.get(offset);
if (releaseableSoftReference == null) {
return null;
}
return releaseableSoftReference.getAndReleaseElement();
}

public List<T> releaseMany(long seqNum, int limit) {
final List<T> elements = new ArrayList<>(limit);
for (; elements.size() < limit; seqNum++) {
T released = release(seqNum);
if (released == null) { break; }
elements.add(released);
}
return elements;
}

public void releaseAll() {
this.minSeqNum = Long.MIN_VALUE;
this.elementCache = null;
this.elementCount = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Page newHeadPage(Checkpoint checkpoint, Queue queue, PageIO pageIO

// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
p.ackedSeqNums.set(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}

return p;
Expand Down Expand Up @@ -97,7 +97,7 @@ public static Page newTailPage(Checkpoint checkpoint, Queue queue, PageIO pageIO
try {
// this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset
if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) {
p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
p.ackedSeqNums.set(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum()));
}

return p;
Expand Down
Loading