Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.adoc
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
= Change Log

== v0.2.0.3

* Fixes
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet or Runlength encoding as an option if offset difference too large (short overflow)
*** A new serialisation format will be added in next version - see https://github.com/confluentinc/parallel-consumer/issues/37[Support BitSet encoding lengths longer than Short.MAX_VALUE #37]
** Gracefully drops encoding attempts if they can't be run
** Fixes a bug in the offset drop if it can't fit in the offset metadata payload

== v0.2.0.2

* Fixes
** Turns back on the https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)]

== v0.2.0.1 DO NOT USE
== v0.2.0.1 DO NOT USE - has critical bug

* Fixes
** Incorrectly turns off an over-flow check in https://github.com/confluentinc/parallel-consumer/issues/35[offset serialisation system (#35)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.confluent.parallelconsumer;

public class BitSetEncodingNotSupportedException extends EncodingNotSupportedException {

public BitSetEncodingNotSupportedException(String msg) {
super(msg);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.confluent.parallelconsumer;

import io.confluent.csid.utils.StringUtils;

import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Optional;

import static io.confluent.parallelconsumer.OffsetEncoding.BitSetCompressed;

/**
* Encodes a range of offsets, from an incompletes collection into a BitSet.
* <p>
* Highly efficient when the completion status is random.
* <p>
* Highly inefficient when the completion status is in large blocks ({@link RunLengthEncoder} is much better)
* <p>
* Because our system works on manipulating INCOMPLETE offsets, it doesn't matter if the offset range we're encoding is
* Sequential or not. Because as records are always in commit order, if we've seen a range of offsets, we know we've
* seen all that exist (within said range). So if offset 8 is missing from the partition, we will encode it as having
* been completed (when in fact it doesn't exist), because we only compare against known incompletes, and assume all
* others are complete.
* <p>
* So, when we deserialize, the INCOMPLETES collection is then restored, and that's what's used to compare to see if a
* record should be skipped or not. So if record 8 is recorded as completed, it will be absent from the restored
* INCOMPLETES list, and we are assured we will never see record 8.
*
* @see RunLengthEncoder
* @see OffsetBitSet
*/
class BitsetEncoder extends OffsetEncoder {

public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;

private final ByteBuffer wrappedBitsetBytesBuffer;
private final BitSet bitSet;

private Optional<byte[]> encodedBytes = Optional.empty();

public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
super(offsetSimultaneousEncoder);
if (length > MAX_LENGTH_ENCODABLE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset too long to encode, as length overflows Short.MAX_VALUE. Length: {}. (max: {})", length, Short.MAX_VALUE));
}
// prep bit set buffer
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
this.wrappedBitsetBytesBuffer.putShort((short) length);
bitSet = new BitSet(length);
}

@Override
protected OffsetEncoding getEncodingType() {
return OffsetEncoding.BitSet;
}

@Override
protected OffsetEncoding getEncodingTypeCompressed() {
return BitSetCompressed;
}

@Override
public void encodeIncompleteOffset(final int index) {
// noop - bitset defaults to 0's (`unset`)
}

@Override
public void encodeCompletedOffset(final int index) {
bitSet.set(index);
}

@Override
public byte[] serialise() {
final byte[] bitSetArray = this.bitSet.toByteArray();
this.wrappedBitsetBytesBuffer.put(bitSetArray);
final byte[] array = this.wrappedBitsetBytesBuffer.array();
this.encodedBytes = Optional.of(array);
return array;
}

@Override
public int getEncodedSize() {
return this.encodedBytes.get().length;
}

@Override
protected byte[] getEncodedBytes() {
return this.encodedBytes.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static io.confluent.csid.utils.BackportUtils.toSeconds;
import static io.confluent.parallelconsumer.ParallelEoSStreamProcessor.State.closed;
Expand Down Expand Up @@ -82,7 +81,7 @@ public void supervise() {
try {
booleanFuture.get();
} catch (Exception e) {
throw new InternalError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
throw new InternalRuntimeError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
}
}
}
Expand Down Expand Up @@ -162,6 +161,7 @@ private boolean isResponsibleForCommits() {

private ConsumerRecords<K, V> pollBrokerForRecords() {
managePauseOfSubscription();
log.debug("Subscriptions are paused: {}", paused);

Duration thisLongPollTimeout = (state == ParallelEoSStreamProcessor.State.running) ? BrokerPollSystem.longPollTimeout : Duration.ofMillis(1); // Can't use Duration.ZERO - this causes Object#wait to wait forever

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.confluent.parallelconsumer;

import java.nio.ByteBuffer;

import static io.confluent.parallelconsumer.OffsetEncoding.ByteArray;
import static io.confluent.parallelconsumer.OffsetEncoding.ByteArrayCompressed;

class ByteBufferEncoder extends OffsetEncoder {

private final ByteBuffer bytesBuffer;

public ByteBufferEncoder(final int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
super(offsetSimultaneousEncoder);
this.bytesBuffer = ByteBuffer.allocate(1 + length);
}

@Override
protected OffsetEncoding getEncodingType() {
return ByteArray;
}

@Override
protected OffsetEncoding getEncodingTypeCompressed() {
return ByteArrayCompressed;
}

@Override
public void encodeIncompleteOffset(final int rangeIndex) {
this.bytesBuffer.put((byte) 0);
}

@Override
public void encodeCompletedOffset(final int rangeIndex) {
this.bytesBuffer.put((byte) 1);
}

@Override
public byte[] serialise() {
return this.bytesBuffer.array();
}

@Override
public int getEncodedSize() {
return this.bytesBuffer.capacity();
}

@Override
protected byte[] getEncodedBytes() {
return this.bytesBuffer.array();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.confluent.parallelconsumer;

public class EncodingNotSupportedException extends Exception {
public EncodingNotSupportedException(final String message) {
super(message);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.confluent.parallelconsumer;

public class InternalRuntimeError extends RuntimeException {

public InternalRuntimeError(final String message) {
super(message);
}

public InternalRuntimeError(final String message, final Throwable cause) {
super(message, cause);
}

public InternalRuntimeError(final Throwable cause) {
super(cause);
}

public InternalRuntimeError(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteBuffer;
Expand All @@ -13,6 +14,9 @@

import static io.confluent.csid.utils.Range.range;

/**
* @see BitsetEncoder
*/
@Slf4j
public class OffsetBitSet {

Expand All @@ -38,18 +42,18 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
return result.toString();
}

static ParallelConsumer.Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
wrap.rewind();
short originalBitsetSize = wrap.getShort();
ByteBuffer slice = wrap.slice();
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
long highwaterMark = baseOffset + originalBitsetSize;
return ParallelConsumer.Tuple.pairOf(highwaterMark, incompletes);
return Tuple.pairOf(highwaterMark, incompletes);
}

static Set<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
BitSet bitSet = BitSet.valueOf(inputBuffer);
var incompletes = new HashSet<Long>(1); // can't know how big this needs to be yet
var incompletes = new HashSet<Long>(); // can't know how big this needs to be yet
for (var relativeOffset : range(originalBitsetSize)) {
long offset = baseOffset + relativeOffset;
if (bitSet.get(relativeOffset)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.confluent.parallelconsumer;

public class OffsetDecodingError extends Exception {
public OffsetDecodingError(final String s, final IllegalArgumentException a) {
super(s, a);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.confluent.parallelconsumer;

import lombok.SneakyThrows;

import java.io.IOException;
import java.nio.ByteBuffer;

abstract class OffsetEncoder {

private final OffsetSimultaneousEncoder offsetSimultaneousEncoder;

public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
this.offsetSimultaneousEncoder = offsetSimultaneousEncoder;
}

protected abstract OffsetEncoding getEncodingType();

protected abstract OffsetEncoding getEncodingTypeCompressed();

abstract void encodeIncompleteOffset(final int rangeIndex);

abstract void encodeCompletedOffset(final int rangeIndex);

abstract byte[] serialise() throws EncodingNotSupportedException;

abstract int getEncodedSize();

boolean quiteSmall() {
return this.getEncodedSize() < OffsetSimultaneousEncoder.LARGE_INPUT_MAP_SIZE_THRESHOLD;
}

byte[] compress() throws IOException {
return OffsetSimpleSerialisation.compressZstd(this.getEncodedBytes());
}

void register() throws EncodingNotSupportedException {
final byte[] bytes = this.serialise();
final OffsetEncoding encodingType = this.getEncodingType();
this.register(encodingType, bytes);
}

private void register(final OffsetEncoding type, final byte[] bytes) {
offsetSimultaneousEncoder.sortedEncodings.add(new EncodedOffsetPair(type, ByteBuffer.wrap(bytes)));
offsetSimultaneousEncoder.encodingMap.put(type, bytes);
}

@SneakyThrows
void registerCompressed() {
final byte[] compressed = compress();
final OffsetEncoding encodingType = this.getEncodingTypeCompressed();
this.register(encodingType, compressed);
}

protected abstract byte[] getEncodedBytes();
}
Loading