Skip to content

Commit d0993f1

Browse files
committed
fix: Gracefully drop Bitset encoding if offset difference too large (short range)
RunLength encoding will still be used. See #37 Support BitSet encoding lengths longer than Short.MAX_VALUE #37 A test did try to cover this, but the offset difference wasn't large enough.
1 parent c8ad274 commit d0993f1

File tree

15 files changed

+390
-121
lines changed

15 files changed

+390
-121
lines changed

CHANGELOG.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
= Change Log
22

3+
== v0.2.0.3
4+
5+
* Fixes
6+
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet encoding as an option if offset different too large
7+
38
== v0.2.0.2
49

510
* Fixes
611
** Turns back on the https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)]
712

8-
== v0.2.0.1 DO NOT USE
13+
== v0.2.0.1 DO NOT USE - has critical bug
914

1015
* Fixes
1116
** Incorrectly turns off an over-flow check in https://github.com/confluentinc/parallel-consumer/issues/35[offset serialisation system (#35)]
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.confluent.parallelconsumer;
2+
3+
public class BitSetEncodingNotSupportedException extends Exception {
4+
5+
public BitSetEncodingNotSupportedException(String msg) {
6+
super(msg);
7+
}
8+
9+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BitsetEncoder.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,50 @@
11
package io.confluent.parallelconsumer;
22

3+
import io.confluent.csid.utils.StringUtils;
4+
35
import java.nio.ByteBuffer;
46
import java.util.BitSet;
57
import java.util.Optional;
68

79
import static io.confluent.parallelconsumer.OffsetEncoding.BitSetCompressed;
810

11+
/**
12+
* Encodes a range of offsets, from an incompletes collection into a BitSet.
13+
* <p>
14+
* Highly efficient when the completion status is random.
15+
* <p>
16+
* Highly inefficient when the completion status is in large blocks ({@link RunLengthEncoder} is much better)
17+
* <p>
18+
* Because our system works on manipulating INCOMPLETE offsets, it doesn't matter if the offset range we're encoding is
19+
* Sequential or not. Because as records are always in commit order, if we've seen a range of offsets, we know we've
20+
* seen all that exist (within said range). So if offset 8 is missing from the partition, we will encode it as having
21+
* been completed (when in fact it doesn't exist), because we only compare against known incompletes, and assume all
22+
* others are complete.
23+
* <p>
24+
* So, when we deserialize, the INCOMPLETES collection is then restored, and that's what's used to compare to see if a
25+
* record should be skipped or not. So if record 8 is recorded as completed, it will be absent from the restored
26+
* INCOMPLETES list, and we are assured we will never see record 8.
27+
*
28+
* @see RunLengthEncoder
29+
* @see OffsetBitSet
30+
*/
931
class BitsetEncoder extends OffsetEncoder {
1032

33+
public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;
34+
1135
private final ByteBuffer wrappedBitsetBytesBuffer;
1236
private final BitSet bitSet;
1337

1438
private Optional<byte[]> encodedBytes = Optional.empty();
1539

16-
public BitsetEncoder(final int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
40+
public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
1741
super(offsetSimultaneousEncoder);
18-
// prep bit set buffer
19-
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
20-
if (length > Short.MAX_VALUE) {
42+
if (length > MAX_LENGTH_ENCODABLE) {
2143
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
22-
throw new RuntimeException("Bitset too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
44+
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset too long to encode, as length overflows Short.MAX_VALUE. Length: {}. (max: {})", length, Short.MAX_VALUE));
2345
}
46+
// prep bit set buffer
47+
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
2448
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
2549
this.wrappedBitsetBytesBuffer.putShort((short) length);
2650
bitSet = new BitSet(length);
@@ -37,13 +61,13 @@ protected OffsetEncoding getEncodingTypeCompressed() {
3761
}
3862

3963
@Override
40-
public void containsIndex(final int rangeIndex) {
41-
//noop
64+
public void encodeIncompleteOffset(final int index) {
65+
// noop - bitset defaults to 0's (`unset`)
4266
}
4367

4468
@Override
45-
public void doesNotContainIndex(final int rangeIndex) {
46-
bitSet.set(rangeIndex);
69+
public void encodeCompletedOffset(final int index) {
70+
bitSet.set(index);
4771
}
4872

4973
@Override

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ private boolean isResponsibleForCommits() {
162162

163163
private ConsumerRecords<K, V> pollBrokerForRecords() {
164164
managePauseOfSubscription();
165+
log.debug("Subscriptions are paused: {}", paused);
165166

166167
Duration thisLongPollTimeout = (state == ParallelEoSStreamProcessor.State.running) ? BrokerPollSystem.longPollTimeout : Duration.ofMillis(1); // Can't use Duration.ZERO - this causes Object#wait to wait forever
167168

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ByteBufferEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
2525
}
2626

2727
@Override
28-
public void containsIndex(final int rangeIndex) {
28+
public void encodeIncompleteOffset(final int rangeIndex) {
2929
this.bytesBuffer.put((byte) 0);
3030
}
3131

3232
@Override
33-
public void doesNotContainIndex(final int rangeIndex) {
33+
public void encodeCompletedOffset(final int rangeIndex) {
3434
this.bytesBuffer.put((byte) 1);
3535
}
3636

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetBitSet.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Copyright (C) 2020 Confluent, Inc.
55
*/
66

7+
import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
78
import lombok.extern.slf4j.Slf4j;
89

910
import java.nio.ByteBuffer;
@@ -13,6 +14,9 @@
1314

1415
import static io.confluent.csid.utils.Range.range;
1516

17+
/**
18+
* @see BitsetEncoder
19+
*/
1620
@Slf4j
1721
public class OffsetBitSet {
1822

@@ -38,18 +42,18 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
3842
return result.toString();
3943
}
4044

41-
static ParallelConsumer.Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
45+
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
4246
wrap.rewind();
4347
short originalBitsetSize = wrap.getShort();
4448
ByteBuffer slice = wrap.slice();
4549
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
4650
long highwaterMark = baseOffset + originalBitsetSize;
47-
return ParallelConsumer.Tuple.pairOf(highwaterMark, incompletes);
51+
return Tuple.pairOf(highwaterMark, incompletes);
4852
}
4953

5054
static Set<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
5155
BitSet bitSet = BitSet.valueOf(inputBuffer);
52-
var incompletes = new HashSet<Long>(1); // can't know how big this needs to be yet
56+
var incompletes = new HashSet<Long>(); // can't know how big this needs to be yet
5357
for (var relativeOffset : range(originalBitsetSize)) {
5458
long offset = baseOffset + relativeOffset;
5559
if (bitSet.get(relativeOffset)) {

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
1717

1818
protected abstract OffsetEncoding getEncodingTypeCompressed();
1919

20-
abstract void containsIndex(final int rangeIndex);
20+
abstract void encodeIncompleteOffset(final int rangeIndex);
2121

22-
abstract void doesNotContainIndex(final int rangeIndex);
22+
abstract void encodeCompletedOffset(final int rangeIndex);
2323

2424
abstract byte[] serialise();
2525

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetMapCodecManager.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.kafka.common.TopicPartition;
1111
import pl.tlinkowski.unij.api.UniSets;
1212

13+
import java.nio.ByteBuffer;
1314
import java.nio.charset.Charset;
1415
import java.util.*;
1516

@@ -47,6 +48,11 @@ public class OffsetMapCodecManager<K, V> {
4748

4849
org.apache.kafka.clients.consumer.Consumer<K, V> consumer;
4950

51+
/**
52+
* Forces the use of a specific codec, instead of choosing the most efficient one. Useful for testing.
53+
*/
54+
static Optional<OffsetEncoding> forcedCodec = Optional.empty();
55+
5056
public OffsetMapCodecManager(final WorkManager<K, V> wm, final org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
5157
this.wm = wm;
5258
this.consumer = consumer;
@@ -111,9 +117,15 @@ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicP
111117
byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
112118
Long nextExpectedOffset = wm.partitionOffsetHighWaterMarks.get(tp) + 1;
113119
OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, nextExpectedOffset, incompleteOffsets).invoke();
114-
byte[] result = simultaneousEncoder.packSmallest();
115-
116-
return result;
120+
if (forcedCodec.isPresent()) {
121+
OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
122+
log.warn("Forcing use of {}", forcedOffsetEncoding);
123+
Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
124+
byte[] bytes = encodingMap.get(forcedOffsetEncoding);
125+
return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
126+
} else {
127+
return simultaneousEncoder.packSmallest();
128+
}
117129
}
118130

119131
/**

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetSimultaneousEncoder.java

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66

77
import lombok.Getter;
8-
import lombok.SneakyThrows;
98
import lombok.extern.slf4j.Slf4j;
109

1110
import java.nio.ByteBuffer;
@@ -17,6 +16,8 @@
1716
* Encode with multiple strategies at the same time.
1817
* <p>
1918
* Have results in an accessible structure, easily selecting the highest compression.
19+
*
20+
* @see #invoke()
2021
*/
2122
@Slf4j
2223
class OffsetSimultaneousEncoder {
@@ -42,6 +43,11 @@ class OffsetSimultaneousEncoder {
4243
*/
4344
private final long nextExpectedOffset;
4445

46+
/**
47+
* The difference between the base offset (the offset to be committed) and the highest seen offset
48+
*/
49+
private final int length;
50+
4551
/**
4652
* Map of different encoding types for the same offset data, used for retrieving the data for the encoding type
4753
*/
@@ -56,10 +62,50 @@ class OffsetSimultaneousEncoder {
5662
@Getter
5763
TreeSet<EncodedOffsetPair> sortedEncodings = new TreeSet<>();
5864

65+
/**
66+
* Force the encoder to also add the compressed versions. Useful for testing.
67+
* <p>
68+
* Visible for testing.
69+
*/
70+
boolean compressionForced = false;
71+
72+
/**
73+
* The encoders to run
74+
*/
75+
private final Set<OffsetEncoder> encoders = new HashSet<>();
76+
5977
public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set<Long> incompleteOffsets) {
6078
this.lowWaterMark = lowWaterMark;
6179
this.nextExpectedOffset = nextExpectedOffset;
6280
this.incompleteOffsets = incompleteOffsets;
81+
82+
length = (int) (this.nextExpectedOffset - this.lowWaterMark);
83+
84+
initEncoders();
85+
}
86+
87+
private void initEncoders() {
88+
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
89+
log.debug("~Large input map size: {} (start: {} end: {})", length, lowWaterMark, nextExpectedOffset);
90+
}
91+
92+
try {
93+
BitsetEncoder bitsetEncoder = new BitsetEncoder(length, this);
94+
encoders.add(bitsetEncoder);
95+
} catch (BitSetEncodingNotSupportedException a) {
96+
log.warn("Cannot use {} encoder", BitsetEncoder.class.getSimpleName(), a);
97+
}
98+
99+
encoders.add(new RunLengthEncoder(this));
100+
}
101+
102+
/**
103+
* Not enabled as byte buffer seems to always be beaten by BitSet, which makes sense
104+
* <p>
105+
* Visible for testing
106+
*/
107+
void addByteBufferEncoder() {
108+
encoders.add(new ByteBufferEncoder(length, this));
63109
}
64110

65111
/**
@@ -84,38 +130,39 @@ public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set
84130
* TODO: optimisation - could double the run-length range from Short.MAX_VALUE (~33,000) to Short.MAX_VALUE * 2
85131
* (~66,000) by using unsigned shorts instead (higest representable relative offset is Short.MAX_VALUE because each
86132
* runlength entry is a Short)
133+
* <p>
134+
* TODO VERY large offests ranges are slow (Integer.MAX_VALUE) - encoding scans could be avoided if passing in map of incompletes which should already be known
87135
*/
88-
@SneakyThrows
89136
public OffsetSimultaneousEncoder invoke() {
90-
log.trace("Starting encode of incompletes of {}, base offset is: {}", this.incompleteOffsets, lowWaterMark);
91-
92-
final int length = (int) (this.nextExpectedOffset - this.lowWaterMark);
93-
94-
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
95-
log.debug("~Large input map size: {}", length);
96-
}
97-
98-
final Set<OffsetEncoder> encoders = new HashSet<>();
99-
encoders.add(new BitsetEncoder(length, this));
100-
encoders.add(new RunLengthEncoder(this));
101-
// TODO: Remove? byte buffer seems to always be beaten by BitSet, which makes sense
102-
// encoders.add(new ByteBufferEncoder(length));
137+
log.debug("Starting encode of incompletes, base offset is: {}, end offset is: {}", lowWaterMark, nextExpectedOffset);
138+
log.trace("Incompletes are: {}", this.incompleteOffsets);
103139

104140
//
105141
log.debug("Encode loop offset start,end: [{},{}] length: {}", this.lowWaterMark, this.nextExpectedOffset, length);
142+
/*
143+
* todo refactor this loop into the encoders (or sequential vs non sequential encoders) as RunLength doesn't need
144+
* to look at every offset in the range, only the ones that change from 0 to 1. BitSet however needs to iterate
145+
* the entire range. So when BitSet can't be used, the encoding would be potentially a lot faster as RunLength
146+
* didn't need the whole loop.
147+
*/
106148
range(length).forEach(rangeIndex -> {
107149
final long offset = this.lowWaterMark + rangeIndex;
150+
List<OffsetEncoder> removeToBeRemoved = new ArrayList<>();
108151
if (this.incompleteOffsets.contains(offset)) {
109152
log.trace("Found an incomplete offset {}", offset);
110-
encoders.forEach(x -> x.containsIndex(rangeIndex));
153+
encoders.forEach(x -> {
154+
x.encodeIncompleteOffset(rangeIndex);
155+
});
111156
} else {
112-
encoders.forEach(x -> x.doesNotContainIndex(rangeIndex));
157+
encoders.forEach(x -> {
158+
x.encodeCompletedOffset(rangeIndex);
159+
});
113160
}
161+
encoders.removeAll(removeToBeRemoved);
114162
});
115163

116164
registerEncodings(encoders);
117165

118-
// log.trace("Input: {}", inputString);
119166
log.debug("In order: {}", this.sortedEncodings);
120167

121168
return this;
@@ -126,8 +173,8 @@ private void registerEncodings(final Set<? extends OffsetEncoder> encoders) {
126173

127174
// compressed versions
128175
// sizes over LARGE_INPUT_MAP_SIZE_THRESHOLD bytes seem to benefit from compression
129-
final boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
130-
if (noEncodingsAreSmallEnough) {
176+
boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
177+
if (noEncodingsAreSmallEnough || compressionForced) {
131178
encoders.forEach(OffsetEncoder::registerCompressed);
132179
}
133180
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
435435
log.debug("Shutting down execution pool...");
436436
List<Runnable> unfinished = workerPool.shutdownNow();
437437
if (!unfinished.isEmpty()) {
438-
log.warn("Threads not done: {}", unfinished);
438+
log.warn("Threads not done count: {}", unfinished.size());
439439
}
440440

441441
log.trace("Awaiting worker pool termination...");
@@ -446,7 +446,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
446446
boolean terminationFinishedWithoutTimeout = workerPool.awaitTermination(toSeconds(DrainingCloseable.DEFAULT_TIMEOUT), SECONDS);
447447
interrupted = false;
448448
if (!terminationFinishedWithoutTimeout) {
449-
log.warn("workerPool await timeout!");
449+
log.warn("Thread execution pool termination await timeout! Were any processing jobs dead locked or otherwise stuck?");
450450
boolean shutdown = workerPool.isShutdown();
451451
boolean terminated = workerPool.isTerminated();
452452
}

0 commit comments

Comments
 (0)