Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Pattern;

// tag::javadoc[]

/**
* Asynchronous / concurrent message consumer for Kafka.
* <p>
Expand Down Expand Up @@ -54,6 +56,20 @@ public interface ParallelConsumer<K, V> extends DrainingCloseable {
*/
void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction);

/**
* Register a function to be applied to a batch of messages.
* <p>
* The system will treat the messages as a set, so if an error is thrown by the user code, then all messages will be
* marked as failed and be retried (Note that when they are retried, there is no guarantee they will all be in the
* same batch again). So if you're going to process messages individually, then don't use this function.
* <p>
* Otherwise, if you're going to process messages in sub sets from this batch, it's better to instead adjust the
* {@link ParallelConsumerOptions#getBatchSize()} instead to the actual desired size, and process them as a whole.
*
* @see ParallelConsumerOptions#getBatchSize()
*/
void pollBatch(Consumer<List<ConsumerRecord<K, V>>> usersVoidConsumptionFunction);

/**
* A simple tuple structure.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;

import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
Expand Down Expand Up @@ -131,6 +132,27 @@ public enum CommitMode {
@Builder.Default
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);

/**
* The maximum number of messages to attempt pass into the {@link ParallelConsumer#pollBatch} user function. Batch
* sizes may be less than this size, but will never be more.
* <p>
* Note that there is no relationship between the Consumer setting of {@code max.poll.records} and this configured
* batch size, as this library introduces a large layer of indirection between the managed consumer, and the managed
* queues we use. This indirection effectively disconnects the processing of messages from "polling" them from the
* managed client, as we do not wait to process them before calling poll again. We simply call poll as much as we
* need to in order to keep our queues full with enough work to satisfy demand - and if we have enough then we
* actively mange pausing our subscription so that we can continue calling {@code poll} without pulling in even more
* messages.
*/
private final Integer batchSize;

/**
* @see #batchSize
*/
public Optional<Integer> getBatchSize() {
return Optional.ofNullable(batchSize);
}

public void validate() {
Objects.requireNonNull(consumer, "A consumer must be supplied");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.ParallelEoSStreamProcessor.State.*;
import static io.confluent.csid.utils.BackportUtils.isEmpty;
Expand Down Expand Up @@ -333,7 +334,13 @@ private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consume

@Override
public void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {
Function<ConsumerRecord<K, V>, List<Object>> wrappedUserFunc = (record) -> {
validateNonBatch();

Function<List<ConsumerRecord<K, V>>, List<Object>> wrappedUserFunc = (recordList) -> {
if (recordList.size() != 1) {
throw new IllegalArgumentException("Bug: Function only takes a single element");
}
var record = recordList.get(0); // will always only have one
log.trace("asyncPoll - Consumed a record ({}), executing void function...", record.offset());

carefullyRun(usersVoidConsumptionFunction, record);
Expand All @@ -349,14 +356,16 @@ public void poll(Consumer<ConsumerRecord<K, V>> usersVoidConsumptionFunction) {
@SneakyThrows
public void pollAndProduceMany(Function<ConsumerRecord<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
validateNonBatch();

// todo refactor out the producer system to a sub class
if (!options.isProducerSupplied()) {
throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
}

// wrap user func to add produce function
Function<ConsumerRecord<K, V>, List<ConsumeProduceResult<K, V, K, V>>> wrappedUserFunc = (consumedRecord) -> {

Function<List<ConsumerRecord<K, V>>, List<ConsumeProduceResult<K, V, K, V>>> wrappedUserFunc = (consumedRecordList) -> {
var consumedRecord = consumedRecordList.get(0); // will always only have one
List<ProducerRecord<K, V>> recordListToProduce = carefullyRun(userFunction, consumedRecord);

if (recordListToProduce.isEmpty()) {
Expand Down Expand Up @@ -574,17 +583,17 @@ private boolean areMyThreadsDone() {
}

/**
* Supervisor loop for the main loop.
*
* @see #supervisorLoop(Function, Consumer)
*/
/**
* Optioanl ID of this instance. Useful for testing.
* Optional ID of this instance. Useful for testing.
*/
@Setter
Optional<String> myId = Optional.empty();

protected <R> void supervisorLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction,
/**
* Supervisor loop for the main loop.
*
* @see #supervisorLoop(Function, Consumer)
*/
protected <R> void supervisorLoop(Function<List<ConsumerRecord<K, V>>, List<R>> userFunction,
Consumer<R> callback) {
log.info("Control loop starting up...");

Expand Down Expand Up @@ -633,7 +642,7 @@ private void addInstanceMDC() {
/**
* Main control loop
*/
private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunction,
private <R> void controlLoop(Function<List<ConsumerRecord<K, V>>, List<R>> userFunction,
Consumer<R> callback) throws TimeoutException, ExecutionException, InterruptedException {

//
Expand Down Expand Up @@ -688,7 +697,7 @@ private <R> void controlLoop(Function<ConsumerRecord<K, V>, List<R>> userFunctio
wm.getTotalWorkWaitingProcessing(), wm.getNumberOfEntriesInPartitionQueues(), wm.getNumberRecordsOutForProcessing(), state);
}

private <R> int handleWork(final Function<ConsumerRecord<K, V>, List<R>> userFunction, final Consumer<R> callback) {
private <R> int handleWork(final Function<List<ConsumerRecord<K, V>>, List<R>> userFunction, final Consumer<R> callback) {
// check queue pressure first before addressing it
checkPressure();

Expand Down Expand Up @@ -835,6 +844,31 @@ private void processWorkCompleteMailBox() {
}
}

@Override
public void pollBatch(Consumer<List<ConsumerRecord<K, V>>> usersVoidConsumptionFunction) {
validateBatch();

Function<List<ConsumerRecord<K, V>>, List<Object>> wrappedUserFunc = (recordList) -> {
log.trace("asyncPoll - Consumed set of records ({}), executing void function...", recordList.size());
usersVoidConsumptionFunction.accept(recordList);
return UniLists.of(); // user function returns no produce records, so we satisfy our api
};
Consumer<Object> voidCallBack = (ignore) -> log.trace("Void callback applied.");
supervisorLoop(wrappedUserFunc, voidCallBack);
}

private void validateBatch() {
if (!options.getBatchSize().isPresent()) {
throw new IllegalArgumentException("Using batching function, but no batch size specified in options");
}
}

private void validateNonBatch() {
if (options.getBatchSize().isPresent()) {
throw new IllegalArgumentException("Batch size specified, but not using batch function");
}
}

/**
* Conditionally commit offsets to broker
*/
Expand Down Expand Up @@ -921,64 +955,125 @@ private void commitOffsetsThatAreReady() {
*
* @param workToProcess the polled records to process
*/
private <R> void submitWorkToPool(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
private <R> void submitWorkToPool(Function<List<ConsumerRecord<K, V>>, List<R>> usersFunction,
Consumer<R> callback,
List<WorkContainer<K, V>> workToProcess) {

if (!workToProcess.isEmpty()) {
log.debug("New work incoming: {}, Pool stats: {}", workToProcess.size(), workerPool);
for (var work : workToProcess) {
// for each record, construct dispatch to the executor and capture a Future
log.trace("Sending work ({}) to pool", work);
Future outputRecordFuture = workerPool.submit(() -> {
addInstanceMDC();
return userFunctionRunner(usersFunction, callback, work);
});
work.setFuture(outputRecordFuture);

if (options.getBatchSize().isPresent()) {
// perf: could inline makeBatches
var batches = makeBatches(workToProcess);
for (var batch : batches) {
submitWorkToPoolInner(usersFunction, callback, batch);
}
} else {
for (var batch : workToProcess) {
submitWorkToPoolInner(usersFunction, callback, UniLists.of(batch));
}
}
}
}

private <R> void submitWorkToPoolInner(final Function<List<ConsumerRecord<K, V>>, List<R>> usersFunction, final Consumer<R> callback, final List<WorkContainer<K, V>> batch) {
// for each record, construct dispatch to the executor and capture a Future
log.trace("Sending work ({}) to pool", batch);
Future outputRecordFuture = workerPool.submit(() -> {
addInstanceMDC();
return runUserFunction(usersFunction, callback, batch);
});
// for a batch, each message in the batch shares the same result
for (final WorkContainer<K, V> workContainer : batch) {
workContainer.setFuture(outputRecordFuture);
}
}

private List<List<WorkContainer<K, V>>> makeBatches(List<WorkContainer<K, V>> workToProcess) {
return partition(workToProcess, (int) options.getBatchSize().get());
}

private static <T> List<List<T>> partition(Collection<T> sourceCollection, int maxBatchSize) {
List<List<T>> listOfBatches = new ArrayList<>();
List<T> batchInConstruction = new ArrayList<>();

//
for (T item : sourceCollection) {
batchInConstruction.add(item);

//
if (batchInConstruction.size() == maxBatchSize) {
listOfBatches.add(batchInConstruction);
batchInConstruction = new ArrayList<>();
}
}

// add partial tail
if (!batchInConstruction.isEmpty()) {
listOfBatches.add(batchInConstruction);
}

return listOfBatches;
}

/**
* Run the supplied function.
*/
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> userFunctionRunner(Function<ConsumerRecord<K, V>, List<R>> usersFunction,
Consumer<R> callback,
WorkContainer<K, V> wc) {
protected <R> List<Tuple<ConsumerRecord<K, V>, R>> runUserFunction(Function<List<ConsumerRecord<K, V>>, List<R>> usersFunction,
Consumer<R> callback,
List<WorkContainer<K, V>> workContainerBatch) {
// call the user's function
List<R> resultsFromUserFunction;
try {
MDC.put("offset", wc.toString());
if (log.isDebugEnabled()) {
// toString can be heavy, especially for large batch sizes
MDC.put("offset", workContainerBatch.toString());
}
log.trace("Pool received: {}", workContainerBatch);

//
List<ConsumerRecord<K, V>> records = workContainerBatch.stream()
.map(WorkContainer::getCr)
.collect(Collectors.toList());

//
boolean epochIsStale = wm.checkEpochIsStale(wc);
boolean epochIsStale = wm.checkEpochIsStale(workContainerBatch);
if (epochIsStale) {
// when epoch's change, we can't remove them from the executor pool queue, so we just have to skip them when we find them
log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", wc);
log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch);
return null;
}

log.trace("Pool received: {}", wc);

ConsumerRecord<K, V> rec = wc.getCr();
resultsFromUserFunction = usersFunction.apply(rec);
// execute
resultsFromUserFunction = usersFunction.apply(records);

onUserFunctionSuccess(wc, resultsFromUserFunction);
// results
for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) {
onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
}

// capture each result, against the input record
var intermediateResults = new ArrayList<Tuple<ConsumerRecord<K, V>, R>>();
for (R result : resultsFromUserFunction) {
log.trace("Running users call back...");
callback.accept(result);
}
log.trace("User function future registered");

// fail or succeed, either way we're done
addToMailBoxOnUserFunctionSuccess(wc, resultsFromUserFunction);
for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) {
addToMailBoxOnUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
}
log.trace("User function future registered");

//
return intermediateResults;
} catch (Exception e) {
// handle fail
log.error("Error processing record", e);
wc.onUserFunctionFailure();
addToMailbox(wc); // always add on error
for (final WorkContainer<K, V> wc : workContainerBatch) {
wc.onUserFunctionFailure();
addToMailbox(wc); // always add on error
}
throw e; // trow again to make the future failed
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
// this state should never happen
log.debug("Work is in queue with stale epoch. Will remove now. Was it not removed properly on revoke? Or are we in a race state? {}", workContainer);
staleWorkToRemove.add(workContainer);
continue; // skip
}
}

Expand Down Expand Up @@ -732,12 +733,24 @@ public void onOffsetCommitSuccess(Map<TopicPartition, OffsetAndMetadata> offsets
});
}

/**
* Have our partitions been revoked? Can a batch contain messages of different epochs?
*
* @return true if any epoch is stale, false if not
*/
boolean checkEpochIsStale(final List<WorkContainer<K, V>> workContainers) {
for (final WorkContainer<K, V> workContainer : workContainers) {
if (checkEpochIsStale(workContainer)) return true;
}
return false;
}

/**
* Have our partitions been revoked?
*
* @return true if epoch doesn't match, false if ok
* @return true if epoch is stale, false if not
*/
boolean checkEpochIsStale(final WorkContainer<K, V> workContainer) {
private boolean checkEpochIsStale(final WorkContainer<K, V> workContainer) {
TopicPartition topicPartitionKey = workContainer.getTopicPartition();

Integer currentPartitionEpoch = partitionsAssignmentEpochs.get(topicPartitionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,10 @@ public static void completeWork(final WorkManager<String, String> wmm, final Wor
wmm.onSuccess(wc);
assertThat(wc.isUserFunctionComplete()).isTrue();
}

public List<ConsumerRecord<String, String>> sendRecords(final int i) {
List<ConsumerRecord<String, String>> consumerRecords = generateRecords(i);
send(consumerSpy, consumerRecords);
return consumerRecords;
}
}
Loading