diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 149aabb18..a792e6cc6 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -7,6 +7,20 @@ toc::[] +== Next Version + +=== Features + +* PollContext API - provides central access to result set with various convenience methods as well as metadata about records, such as failure count +* Batching - all API methods now support batching. +See the Options class set batch size for more information. + +=== Fixes and Improvements + +* Event system - better CPU usage in control thread +* Concurrency stability improvements +* Update dependencies + == v0.4.0.0 // https://github.com/confluentinc/parallel-consumer/releases/tag/0.4.0.0 diff --git a/README.adoc b/README.adoc index fa70991b0..a6ca973d9 100644 --- a/README.adoc +++ b/README.adoc @@ -235,12 +235,12 @@ without operational burden or harming the cluster's performance * Solution for the https://en.wikipedia.org/wiki/Head-of-line_blocking["head of line"] blocking problem where continued failure of a single message, prevents progress for messages behind it in the queue * Per `key` concurrent processing, per partition and unordered message processing * Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries -* Batch version fo the API to process batches of messages in parallel instead of single messages. +* Batch support in all versions of the API to process batches of messages in parallel instead of single messages. +** Particularly useful for when your processing function can work with more than a single record at a time - e.g. sending records to an API which has a batch version like Elasticsearch * Vert.x and Reactor.io non-blocking library integration ** Non-blocking I/O work management ** Vert.x's WebClient and general Vert.x Future support ** Reactor.io Publisher (Mono/Flux) and Java's CompletableFuture (through `Mono#fromFuture`) -* Reactor non-blocking library integration * Fair partition traversal * Zero~ dependencies (`Slf4j` and `Lombok`) for the core module * Java 8 compatibility @@ -490,9 +490,10 @@ You can also optionally provide a callback function to be run after the message( .Usage - print message content out to the console in parallel [source,java,indent=0] - parallelConsumer.pollAndProduce(record -> { - var result = processBrokerRecord(record); - return new ProducerRecord<>(outputTopic, record.key(), result.payload); + parallelConsumer.pollAndProduce(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); + var result = processBrokerRecord(consumerRecord); + return new ProducerRecord<>(outputTopic, consumerRecord.key(), result.payload); }, consumeProduceResult -> { log.debug("Message {} saved to broker at offset {}", consumeProduceResult.getOut(), @@ -512,10 +513,12 @@ In future versions, we plan to look at supporting other streaming systems like h [[batching]] === Batching -The library also support a batch version of the API. -Using this, you can process batches of messages at once. +The library also supports sending a batch or records as input to the users processing function in parallel. +Using this, you can process several records in your function at once. -To use it, use one of the `batch` versions instead. +To use it, set a `batch size` in the options class. + +There are then various access methods for the batch of records - see the `PollContext` object for more information. IMPORTANT: If an exception is thrown while processing the batch, all messages in the batch will be returned to the queue, to be retried with the standard retry system. There is no guarantee that the messages will be retried again in the same batch. @@ -530,10 +533,10 @@ There is no guarantee that the messages will be retried again in the same batch. .maxConcurrency(100) .batchSize(5) // <1> .build()); - parallelConsumer.pollBatch(batchOfRecords -> { + parallelConsumer.poll(context -> { // convert the batch into the payload for our processing - List payload = batchOfRecords.stream() - .map(this::pareparePayload) + List payload = context.stream() + .map(this::preparePayload) .collect(Collectors.toList()); // process the entire batch payload at once processBatchPayload(payload); @@ -553,9 +556,10 @@ There is no guarantee that the messages will be retried again in the same batch. .Call an HTTP endpoint for each message usage [source,java,indent=0] ---- - var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> { - log.info("Concurrently constructing and returning RequestInfo from record: {}", record); - Map params = UniMaps.of("recordKey", record.key(), "payload", record.value()); + var resultStream = parallelConsumer.vertxHttpReqInfoStream(context -> { + var consumerRecord = context.getSingleConsumerRecord(); + log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord); + Map params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value()); return new RequestInfo("localhost", port, "/api", params); // <1> }); ---- @@ -632,6 +636,14 @@ From the `Clients` view, get the connection information customized to your clust .. Use these settings presented to https://docs.confluent.io/clients-kafka-java/current/overview.html[configure your clients]. . Use these clients for steps outlined in the <> section. +[[upgrading]] +== Upgrading + +=== From 0.4 to 0.5 + +This version has a breaking change in the API - instead of passing in `ConsumerRecord` instances, it passes in a `PollContext` object which has extra information and utility methods. +See the `PollContext` class for more information. + [[ordering-guarantees]] == Ordering Guarantees @@ -733,8 +745,8 @@ You can access the retry count of a record through it's wrapped `WorkContainer` final int baseDelaySecond = 1; ParallelConsumerOptions.builder() - .retryDelayProvider(workContainer -> { - int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts(); + .retryDelayProvider(recordContext -> { + int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts(); long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000); return Duration.ofMillis(delayMillis); }); @@ -756,7 +768,8 @@ Implementing a https://github.com/confluentinc/parallel-consumer/issues/196[max final int maxRetries = 10; final Map, Long> retriesCount = new ConcurrentHashMap<>(); - pc.poll(consumerRecord -> { + pc.poll(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L); if (retryCount < maxRetries) { processRecord(consumerRecord); @@ -782,7 +795,8 @@ This will put the message back in the queue. ---- final Map upMap = new ConcurrentHashMap<>(); - pc.poll(consumerRecord -> { + pc.poll(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); String serverId = extractServerId(consumerRecord); boolean up = upMap.computeIfAbsent(serverId, ignore -> true); diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml index d92f54b1b..d7bab1097 100644 --- a/parallel-consumer-core/pom.xml +++ b/parallel-consumer-core/pom.xml @@ -100,6 +100,12 @@ com.google.flogger flogger-slf4j-backend + + org.threeten + threeten-extra + 1.7.0 + test + diff --git a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/TimeUtils.java b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/TimeUtils.java index cace57470..4ca936816 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/TimeUtils.java +++ b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/TimeUtils.java @@ -1,13 +1,14 @@ package io.confluent.csid.utils; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import lombok.SneakyThrows; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; +import java.time.Clock; import java.time.Duration; import java.util.concurrent.Callable; @@ -15,6 +16,10 @@ @UtilityClass public class TimeUtils { + public Clock getClock() { + return Clock.systemUTC(); + } + @SneakyThrows public static RESULT time(final Callable func) { long start = System.currentTimeMillis(); diff --git a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/WallClock.java b/parallel-consumer-core/src/main/java/io/confluent/csid/utils/WallClock.java deleted file mode 100644 index a42dc0d57..000000000 --- a/parallel-consumer-core/src/main/java/io/confluent/csid/utils/WallClock.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.confluent.csid.utils; - -/*- - * Copyright (C) 2020 Confluent, Inc. - */ - -import java.time.Instant; - -public class WallClock { - - public Instant getNow() { - return Instant.now(); - } - -} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java index 6bbd552b3..3b7568915 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessor.java @@ -1,12 +1,11 @@ package io.confluent.parallelconsumer; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.csid.utils.Java8StreamUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.List; @@ -21,7 +20,7 @@ public class JStreamParallelEoSStreamProcessor extends ParallelEoSStreamPr private final ConcurrentLinkedDeque> userProcessResultsStream; - public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) { + public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsumerOptions) { super(parallelConsumerOptions); this.userProcessResultsStream = new ConcurrentLinkedDeque<>(); @@ -30,7 +29,7 @@ public JStreamParallelEoSStreamProcessor(ParallelConsumerOptions parallelConsume } @Override - public Stream> pollProduceAndStream(Function, List>> userFunction) { + public Stream> pollProduceAndStream(Function, List>> userFunction) { super.pollAndProduceMany(userFunction, (result) -> { log.trace("Wrapper callback applied, sending result to stream. Input: {}", result); this.userProcessResultsStream.add(result); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java index fbae551a4..d95e4ff65 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/JStreamParallelStreamProcessor.java @@ -1,11 +1,11 @@ package io.confluent.parallelconsumer; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.DrainingCloseable; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.List; @@ -14,7 +14,7 @@ public interface JStreamParallelStreamProcessor extends DrainingCloseable { - static JStreamParallelStreamProcessor createJStreamEosStreamProcessor(ParallelConsumerOptions options) { + static JStreamParallelStreamProcessor createJStreamEosStreamProcessor(ParallelConsumerOptions options) { return new JStreamParallelEoSStreamProcessor<>(options); } @@ -24,6 +24,7 @@ static JStreamParallelStreamProcessor createJStreamEosStreamPro * * @return a stream of results of applying the function to the polled records */ - Stream> pollProduceAndStream(Function, - List>> userFunction); + Stream> pollProduceAndStream( + Function, + List>> userFunction); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 88a597531..771e3e52d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -15,7 +15,6 @@ import java.time.Duration; import java.util.Objects; -import java.util.Optional; import java.util.function.Function; import static io.confluent.csid.utils.StringUtils.msg; @@ -145,6 +144,7 @@ public enum CommitMode { */ @Builder.Default private final int maxConcurrency = DEFAULT_MAX_CONCURRENCY; + public static final int DEFAULT_MAX_CONCURRENCY = 16; /** @@ -159,14 +159,7 @@ public enum CommitMode { *

* Overrides {@link #defaultMessageRetryDelay}, even if it's set. */ - @Builder.Default - private final Function retryDelayProvider; - - /** - * Dirty global access to the {@link #retryDelayProvider}. - */ - // TODO remove need for writeable global access - public static Function retryDelayProviderStatic; + private final Function, Duration> retryDelayProvider; /** * Controls how long to block while waiting for the {@link Producer#send} to complete for any ProducerRecords @@ -189,8 +182,16 @@ public enum CommitMode { private final Duration offsetCommitTimeout = Duration.ofSeconds(10); /** - * The maximum number of messages to attempt to pass into the {@code batch} versions of the user function. Batch - * sizes may sometimes be less than this size, but will never be more. + * The maximum number of messages to attempt to pass into the user functions. + *

+ * Batch sizes may sometimes be less than this size, but will never be more. + *

+ * The system will treat the messages as a set, so if an error is thrown by the user code, then all messages will be + * marked as failed and be retried (Note that when they are retried, there is no guarantee they will all be in the + * same batch again). So if you're going to process messages individually, then don't set a batch size. + *

+ * Otherwise, if you're going to process messages in sub sets from this batch, it's better to instead adjust the + * {@link ParallelConsumerOptions#getBatchSize()} instead to the actual desired size, and process them as a whole. *

* Note that there is no relationship between the {@link ConsumerConfig} setting of {@link * ConsumerConfig#MAX_POLL_RECORDS_CONFIG} and this configured batch size, as this library introduces a large layer @@ -202,8 +203,12 @@ public enum CommitMode { *

* If we have enough, then we actively manage pausing our subscription so that we can continue calling {@code poll} * without pulling in even more messages. + *

+ * + * @see ParallelConsumerOptions#getBatchSize() */ - private final Integer batchSize; + @Builder.Default + private final Integer batchSize = 1; /** * Configure the amount of delay a record experiences, before a warning is logged. @@ -211,22 +216,18 @@ public enum CommitMode { @Builder.Default private final Duration thresholdForTimeSpendInQueueWarning = Duration.ofSeconds(10); - /** - * @see #batchSize - */ - public Optional getBatchSize() { - return Optional.ofNullable(batchSize); - } - public boolean isUsingBatching() { - return this.getBatchSize().isPresent(); + return getBatchSize() > 1; } + @Builder.Default + private final int maxFailureHistory = 10; + /** * @return the combined target of the desired concurrency by the configured batch size */ public int getTargetAmountOfRecordsInFlight() { - return getMaxConcurrency() * getBatchSize().orElse(1); + return getMaxConcurrency() * getBatchSize(); } public void validate() { @@ -239,7 +240,6 @@ public void validate() { // WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); - ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider(); } public boolean isUsingTransactionalProducer() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index 9a85c68a3..e9486aab7 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -7,7 +7,6 @@ import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import pl.tlinkowski.unij.api.UniLists; @@ -17,8 +16,6 @@ import java.util.function.Consumer; import java.util.function.Function; -import static io.confluent.csid.utils.StringUtils.msg; -import static io.confluent.parallelconsumer.internal.Documentation.getLinkHtmlToDocSection; import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun; @Slf4j @@ -29,74 +26,50 @@ public class ParallelEoSStreamProcessor extends AbstractParallelEoSStreamP * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which * way as per normal. * - * @param newOptions * @see ParallelConsumerOptions */ - public ParallelEoSStreamProcessor(final ParallelConsumerOptions newOptions) { + public ParallelEoSStreamProcessor(final ParallelConsumerOptions newOptions) { super(newOptions); } @Override - public void poll(Consumer> usersVoidConsumptionFunction) { - validateNonBatch(); + public void poll(Consumer> usersVoidConsumptionFunction) { + Function, List> wrappedUserFunc = (context) -> { + log.trace("asyncPoll - Consumed a consumerRecord ({}), executing void function...", context); - Function>, List> wrappedUserFunc = (recordList) -> { - if (recordList.size() != 1) { - throw new IllegalArgumentException("Bug: Function only takes a single element"); - } - var record = recordList.get(0); // will always only have one - log.trace("asyncPoll - Consumed a record ({}), executing void function...", record.offset()); - - carefullyRun(usersVoidConsumptionFunction, record); + carefullyRun(usersVoidConsumptionFunction, context.getPollContext()); log.trace("asyncPoll - user function finished ok."); return UniLists.of(); // user function returns no produce records, so we satisfy our api }; - Consumer voidCallBack = (ignore) -> log.trace("Void callback applied."); + Consumer voidCallBack = ignore -> log.trace("Void callback applied."); supervisorLoop(wrappedUserFunc, voidCallBack); } - - private void validateBatch() { - //noinspection SimplifyOptionalCallChains - only in 11 - if (!options.getBatchSize().isPresent()) { - throw new IllegalArgumentException(msg("Using batching version of the API, but no batch size specified in the `options`. See {}", getLinkHtmlToDocSection("#batching"))); - } - } - - private void validateNonBatch() { - if (options.getBatchSize().isPresent()) { - throw new IllegalArgumentException(msg("A 'batch size' has been specified in `options`, so you must use the `batch` versions of the polling methods. See {}", getLinkHtmlToDocSection("#batching"))); - } - } - @Override @SneakyThrows - public void pollAndProduceMany(Function, List>> userFunction, + public void pollAndProduceMany(Function, List>> userFunction, Consumer> callback) { - validateNonBatch(); - // todo refactor out the producer system to a sub class if (!getOptions().isProducerSupplied()) { throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options"); } // wrap user func to add produce function - Function>, List>> wrappedUserFunc = (consumedRecordList) -> { - var consumedRecord = consumedRecordList.get(0); // will always only have one - List> recordListToProduce = carefullyRun(userFunction, consumedRecord); + Function, List>> wrappedUserFunc = context -> { + List> recordListToProduce = carefullyRun(userFunction, context.getPollContext()); if (recordListToProduce.isEmpty()) { log.debug("No result returned from function to send."); } - log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", consumedRecord, recordListToProduce); + log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce); List> results = new ArrayList<>(); log.trace("Producing {} messages in result...", recordListToProduce.size()); for (ProducerRecord toProduce : recordListToProduce) { log.trace("Producing {}", toProduce); RecordMetadata produceResultMeta = super.getProducerManager().get().produceMessage(toProduce); - var result = new ConsumeProduceResult<>(consumedRecord, toProduce, produceResultMeta); + var result = new ConsumeProduceResult<>(context.getPollContext(), toProduce, produceResultMeta); results.add(result); } return results; @@ -107,8 +80,8 @@ public void pollAndProduceMany(Function, List, List>> userFunction) { - pollAndProduceMany(userFunction, (record) -> { + public void pollAndProduceMany(Function, List>> userFunction) { + pollAndProduceMany(userFunction, consumerRecord -> { // no op call back log.trace("No-op user callback"); }); @@ -116,8 +89,8 @@ public void pollAndProduceMany(Function, List, ProducerRecord> userFunction) { - pollAndProduce(userFunction, (record) -> { + public void pollAndProduce(Function, ProducerRecord> userFunction) { + pollAndProduce(userFunction, consumerRecord -> { // no op call back log.trace("No-op user callback"); }); @@ -125,22 +98,9 @@ public void pollAndProduce(Function, ProducerRecord> @Override @SneakyThrows - public void pollAndProduce(Function, ProducerRecord> userFunction, + public void pollAndProduce(Function, ProducerRecord> userFunction, Consumer> callback) { - pollAndProduceMany((record) -> UniLists.of(userFunction.apply(record)), callback); - } - - @Override - public void pollBatch(Consumer>> usersVoidConsumptionFunction) { - validateBatch(); - - Function>, List> wrappedUserFunc = (recordList) -> { - log.trace("asyncPoll - Consumed set of records ({}), executing void function...", recordList.size()); - usersVoidConsumptionFunction.accept(recordList); - return UniLists.of(); // user function returns no produce records, so we satisfy our api - }; - Consumer voidCallBack = (ignore) -> log.trace("Void callback applied."); - supervisorLoop(wrappedUserFunc, voidCallBack); + pollAndProduceMany(consumerRecord -> UniLists.of(userFunction.apply(consumerRecord)), callback); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java index af9ac45c0..e4568145f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelStreamProcessor.java @@ -3,10 +3,9 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.internal.DrainingCloseable; import lombok.Data; -import lombok.SneakyThrows; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -32,7 +31,7 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle * * @param usersVoidConsumptionFunction the function */ - void poll(Consumer> usersVoidConsumptionFunction); + void poll(Consumer> usersVoidConsumptionFunction); /** @@ -41,23 +40,20 @@ static ParallelStreamProcessor createEosStreamProcessor(Paralle * * @param callback applied after the produced message is acknowledged by kafka */ - @SneakyThrows - void pollAndProduceMany(Function, List>> userFunction, + void pollAndProduceMany(Function, List>> userFunction, Consumer> callback); /** * Register a function to be applied in parallel to each received message, which in turn returns one or many {@link * ProducerRecord}s to be sent back to the broker. */ - @SneakyThrows - void pollAndProduceMany(Function, List>> userFunction); + void pollAndProduceMany(Function, List>> userFunction); /** * Register a function to be applied in parallel to each received message, which in turn returns a {@link * ProducerRecord} to be sent back to the broker. */ - @SneakyThrows - void pollAndProduce(Function, ProducerRecord> userFunction); + void pollAndProduce(Function, ProducerRecord> userFunction); /** * Register a function to be applied in parallel to each received message, which in turn returns a {@link @@ -65,24 +61,9 @@ void pollAndProduceMany(Function, List * * @param callback applied after the produced message is acknowledged by kafka */ - @SneakyThrows - void pollAndProduce(Function, ProducerRecord> userFunction, + void pollAndProduce(Function, ProducerRecord> userFunction, Consumer> callback); - /** - * Register a function to be applied to a batch of messages. - *

- * The system will treat the messages as a set, so if an error is thrown by the user code, then all messages will be - * marked as failed and be retried (Note that when they are retried, there is no guarantee they will all be in the - * same batch again). So if you're going to process messages individually, then don't use this function. - *

- * Otherwise, if you're going to process messages in sub sets from this batch, it's better to instead adjust the - * {@link ParallelConsumerOptions#getBatchSize()} instead to the actual desired size, and process them as a whole. - * - * @see ParallelConsumerOptions#getBatchSize() - */ - void pollBatch(Consumer>> usersVoidConsumptionFunction); - /** * A simple triple structure to capture the set of coinciding data. * @@ -99,7 +80,7 @@ void pollAndProduce(Function, ProducerRecord> userFun */ @Data class ConsumeProduceResult { - private final ConsumerRecord in; + private final PollContext in; private final ProducerRecord out; private final RecordMetadata meta; } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContext.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContext.java new file mode 100644 index 000000000..2b7c75cf0 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContext.java @@ -0,0 +1,234 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.state.WorkContainer; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.FieldDefaults; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static io.confluent.csid.utils.StringUtils.msg; +import static io.confluent.parallelconsumer.internal.Documentation.getLinkHtmlToDocSection; + + +/** + * Context object used to pass messages to process to users processing functions. + *

+ * Results sets can be iterated in a variety of ways. Explore the different methods available. + *

+ * You can access for {@link ConsumerRecord}s directly, or you can get the {@link RecordContext} wrappers, which provide + * extra information about the specific records, such as {@link RecordContext#getNumberOfFailedAttempts()}. + *

+ * Note that if you are not setting a {@link ParallelConsumerOptions#batchSize}, then you can use the {@link + * #getSingleRecord()}, and it's convenience accessors ({@link #value()}, {@link #offset()}, {@link #key()} {@link + * #getSingleConsumerRecord()}). But if you have configured batching, they will all throw an {@link + * IllegalArgumentException}, as it's not valid to have batches of messages and yet tread the batch input as a single + * record. + */ +@AllArgsConstructor +@FieldDefaults(makeFinal = true, level = AccessLevel.PRIVATE) +@ToString +@EqualsAndHashCode +public class PollContext implements Iterable> { + + protected Map>> records = new HashMap<>(); + + PollContext(List> workContainers) { + for (var wc : workContainers) { + TopicPartition topicPartition = wc.getTopicPartition(); + var recordSet = records.computeIfAbsent(topicPartition, ignore -> new HashSet<>()); + recordSet.add(new RecordContextInternal<>(wc)); + } + } + + /** + * @return a flat {@link Stream} of {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this result + * set + */ + public Stream> streamInternal() { + return this.records.values().stream().flatMap(Collection::stream); + } + + /** + * @return a flat {@link Stream} of {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this result + * set + */ + public Stream> stream() { + return getByTopicPartitionMap().values().stream().flatMap(Collection::stream); + } + + /** + * @return a flat {@link Stream} of {@link ConsumerRecord} in this poll set + */ + public Stream> streamConsumerRecords() { + return stream().map(RecordContext::getConsumerRecord); + } + + /** + * Must not be using batching ({@link ParallelConsumerOptions#batchSize}). + * + * @return the single {@link RecordContext} entry in this poll set + * @throws IllegalArgumentException if a {@link ParallelConsumerOptions#getBatchSize()} has been set. + */ + public RecordContext getSingleRecord() { + // instead of calling Options#isUsingBatch - this way we don't need to access to the options class, and this is effectively the same thing + if (size() != 1) { + throw new IllegalArgumentException(msg("A 'batch size' has been specified in `options`, so you must use the `batch` versions of the polling methods. See {}", getLinkHtmlToDocSection("#batching"))); + } + //noinspection OptionalGetWithoutIsPresent + return stream().findFirst().get(); // NOSONAR + } + + /** + * Must not be using batching ({@link ParallelConsumerOptions#batchSize}). + * + * @return the single {@link ConsumerRecord} entry in this poll set + * @see #getSingleRecord() + */ + public ConsumerRecord getSingleConsumerRecord() { + return getSingleRecord().getConsumerRecord(); + } + + /** + * For backwards compatibility with {@link ConsumerRecord#value()}. + *

+ * Must not be using batching ({@link ParallelConsumerOptions#batchSize}). + * + * @return the single {@link ConsumerRecord#value()} entry in this poll set + * @see #getSingleRecord() + */ + public V value() { + return getSingleConsumerRecord().value(); + } + + /** + * For backwards compatibility with {@link ConsumerRecord#key()}. + *

+ * Must not be using batching ({@link ParallelConsumerOptions#batchSize}). + * + * @return the single {@link ConsumerRecord#key()} entry in this poll set + * @see #getSingleRecord() + */ + public K key() { + return getSingleConsumerRecord().key(); + } + + /** + * For backwards compatibility with {@link ConsumerRecord#offset()}. + *

+ * Must not be using batching ({@link ParallelConsumerOptions#batchSize}). + * + * @return the single {@link ConsumerRecord#offset()} entry in this poll set + * @see #getSingleRecord() + */ + public long offset() { + return getSingleConsumerRecord().offset(); + } + + /** + * @return a flat {@link List} of {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this result set + */ + public List> getContextsFlattened() { + return records.values().stream() + .flatMap(Collection::stream) + .map(RecordContextInternal::getRecordContext) + .collect(Collectors.toList()); + } + + /** + * @return a flat {@link List} of {@link ConsumerRecord}s in this result set + */ + public List> getConsumerRecordsFlattened() { + return streamConsumerRecords().collect(Collectors.toList()); + } + + /** + * @return a flat {@link Iterator} of the {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this + * result set + */ + @Override + public Iterator> iterator() { + return stream().iterator(); + } + + /** + * @param action to perform on the {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this result + * set + */ + @Override + public void forEach(Consumer> action) { + Iterable.super.forEach(action); + } + + /** + * @return a flat {@link Spliterator} of the {@link RecordContext}s, which wrap the {@link ConsumerRecord}s in this + * result set + */ + @Override + public Spliterator> spliterator() { + return Iterable.super.spliterator(); + } + + /** + * @return a {@link Map} of {@link TopicPartition} to {@link RecordContext} {@link Set}, which wrap the {@link + * ConsumerRecord}s in this result set + */ + public Map>> getByTopicPartitionMap() { + return this.records.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, + set -> set.getValue().stream() + .map(RecordContextInternal::getRecordContext) + .collect(Collectors.toSet()) + ) + ); + } + + /** + * @return the total count of records in this result set + */ + public long size() { + return stream().count(); + } + + /** + * Get all the offsets for the records in this result set. + *

+ * Note that this flattens the result, so if there are records from multiple {@link TopicPartition}s, the partition + * they belong to will be lost. If you want that information as well, try {@link #getOffsets()}. + * + * @return a flat List of offsets in this result set + * @see #getOffsets() + */ + public List getOffsetsFlattened() { + return streamConsumerRecords().mapToLong(ConsumerRecord::offset).boxed().collect(Collectors.toList()); + } + + /** + * Map of partitions to offsets. + *

+ * If you don't need the partition information, try {@link #getOffsetsFlattened()}. + * + * @return a map of {@link TopicPartition} to offsets, of the records in this result set + * @see #getOffsetsFlattened() + */ + public Map> getOffsets() { + return getByTopicPartitionMap().entrySet().stream() + .collect(Collectors + .toMap(Map.Entry::getKey, e -> e.getValue().stream() + .map(RecordContext::offset).collect(Collectors.toList()) + ) + ); + } + +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java new file mode 100644 index 000000000..acf12eb0c --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PollContextInternal.java @@ -0,0 +1,43 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.state.WorkContainer; +import lombok.Getter; +import lombok.experimental.Delegate; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Internal only view on the {@link PollContext}. + */ +public class PollContextInternal { + + @Delegate + @Getter + private final PollContext pollContext; + + public PollContextInternal(List> workContainers) { + this.pollContext = new PollContext<>(workContainers); + } + + /** + * @return a stream of {@link WorkContainer}s + */ + public Stream> streamWorkContainers() { + return pollContext.streamInternal().map(RecordContextInternal::getWorkContainer); + } + + /** + * @return a flat {@link List} of {@link WorkContainer}s, which wrap the {@link ConsumerRecord}s in this result set + */ + public List> getWorkContainers() { + return streamWorkContainers().collect(Collectors.toList()); + } + +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContext.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContext.java new file mode 100644 index 000000000..f004f702c --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContext.java @@ -0,0 +1,75 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.state.ConsumerRecordId; +import io.confluent.parallelconsumer.state.WorkContainer; +import lombok.*; +import lombok.experimental.Delegate; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.time.Instant; +import java.util.Optional; + +import static lombok.AccessLevel.PROTECTED; + +/** + * Context information for the wrapped {@link ConsumerRecord}. + *

+ * Includes all accessors (~getters) in {@link ConsumerRecord} via delegation ({@link Delegate}). + * + * @see #getNumberOfFailedAttempts() + */ +@Builder(toBuilder = true) +@AllArgsConstructor +@ToString +@EqualsAndHashCode +public class RecordContext { + + @Getter(PROTECTED) + protected final WorkContainer workContainer; + + @Getter + @Delegate + private final ConsumerRecord consumerRecord; + + public RecordContext(WorkContainer wc) { + this.consumerRecord = wc.getCr(); + this.workContainer = wc; + } + + /** + * A useful ID class for consumer records. + * + * @return the ID for the contained record + */ + public ConsumerRecordId getRecordId() { + var topicPartition = new TopicPartition(topic(), partition()); + return new ConsumerRecordId(topicPartition, offset()); + } + + /** + * @return the number of times this {@link ConsumerRecord} has failed processing already + */ + public int getNumberOfFailedAttempts() { + return workContainer.getNumberOfFailedAttempts(); + } + + /** + * @return if the record has failed, return the time at which is last failed at + */ + public Optional getLastFailureAt() { + return workContainer.getLastFailedAt(); + } + + /** + * @return if the record had succeeded, returns the time at this the user function returned + */ + public Optional getSucceededAt() { + return workContainer.getSucceededAt(); + } +} + diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java new file mode 100644 index 000000000..44c1a04ed --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RecordContextInternal.java @@ -0,0 +1,25 @@ +package io.confluent.parallelconsumer; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.state.WorkContainer; +import lombok.Getter; + +/** + * Internal only view of the {@link RecordContext} class. + */ +public class RecordContextInternal { + + @Getter + private final RecordContext recordContext; + + public RecordContextInternal(WorkContainer wc) { + this.recordContext = new RecordContext<>(wc); + } + + public WorkContainer getWorkContainer() { + return getRecordContext().getWorkContainer(); + } +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index b92c79282..ff9747d6d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -4,9 +4,10 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.WallClock; +import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.WorkContainer; import io.confluent.parallelconsumer.state.WorkManager; import lombok.AccessLevel; @@ -22,12 +23,12 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.slf4j.MDC; -import pl.tlinkowski.unij.api.UniLists; import javax.naming.InitialContext; import javax.naming.NamingException; import java.io.Closeable; import java.lang.reflect.Field; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.*; @@ -62,7 +63,7 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall * Injectable clock for testing */ @Setter(AccessLevel.PACKAGE) - private WallClock clock = new WallClock(); + private Clock clock = TimeUtils.getClock(); /** * Kafka's default auto commit frequency. https://docs.confluent.io/platform/current/clients/consumer.html#id1 @@ -191,7 +192,7 @@ public Exception getFailureCause() { * * @see ParallelConsumerOptions */ - public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { + public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { Objects.requireNonNull(newOptions, "Options must be supplied"); log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions); @@ -208,7 +209,7 @@ public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency()); - this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, new WallClock()); + this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock()); ConsumerManager consumerMgr = new ConsumerManager<>(consumer); @@ -557,7 +558,7 @@ private boolean areMyThreadsDone() { * * @see #supervisorLoop(Function, Consumer) */ - protected void supervisorLoop(Function>, List> userFunction, + protected void supervisorLoop(Function, List> userFunctionWrapped, Consumer callback) { if (state != State.unused) { throw new IllegalStateException(msg("Invalid state - the consumer cannot be used more than once (current " + @@ -587,7 +588,7 @@ protected void supervisorLoop(Function>, List> this.blockableControlThread = controlThread; while (state != closed) { try { - controlLoop(userFunction, callback); + controlLoop(userFunctionWrapped, callback); } catch (Exception e) { log.error("Error from poll control thread, will attempt controlled shutdown, then rethrow. Error: " + e.getMessage(), e); doClose(DrainingCloseable.DEFAULT_TIMEOUT); // attempt to close @@ -612,8 +613,8 @@ private void addInstanceMDC() { /** * Main control loop */ - private void controlLoop(Function>, List> userFunction, - Consumer callback) throws TimeoutException, ExecutionException, InterruptedException { + private void controlLoop(Function, List> userFunction, + Consumer callback) throws TimeoutException, ExecutionException { // int newWork = handleWork(userFunction, callback); @@ -667,7 +668,7 @@ private void controlLoop(Function>, List> userF wm.getTotalWorkAwaitingIngestion(), wm.getNumberOfEntriesInPartitionQueues(), wm.getNumberRecordsOutForProcessing(), state); } - private int handleWork(final Function>, List> userFunction, final Consumer callback) { + private int handleWork(final Function, List> userFunction, final Consumer callback) { // check queue pressure first before addressing it checkPipelinePressure(); @@ -700,36 +701,33 @@ private int handleWork(final Function>, List> u * * @param workToProcess the polled records to process */ - protected void submitWorkToPool(Function>, List> usersFunction, + protected void submitWorkToPool(Function, List> usersFunction, Consumer callback, List> workToProcess) { if (!workToProcess.isEmpty()) { log.debug("New work incoming: {}, Pool stats: {}", workToProcess.size(), workerThreadPool); - if (options.isUsingBatching()) { - // perf: could inline makeBatches - var batches = makeBatches(workToProcess); + + // perf: could inline makeBatches + var batches = makeBatches(workToProcess); + + // debugging + if (log.isDebugEnabled()) { var sizes = batches.stream().map(List::size).sorted().collect(Collectors.toList()); - if (log.isDebugEnabled()) { - log.debug("Number batches: {}, smallest {}, sizes {}", batches.size(), sizes.stream().findFirst().get(), sizes); - List integerStream = sizes.stream().filter(x -> x < (int) options.getBatchSize().get()).collect(Collectors.toList()); - if (integerStream.size() > 1) { - log.warn("More than one batch isn't target size: {}. Input number of batches: {}", integerStream, batches.size()); - } - } - for (var batch : batches) { - int target = (int) options.getBatchSize().get(); - int size = batch.size(); - submitWorkToPoolInner(usersFunction, callback, batch); - } - } else { - for (var batch : workToProcess) { - submitWorkToPoolInner(usersFunction, callback, UniLists.of(batch)); + log.debug("Number batches: {}, smallest {}, sizes {}", batches.size(), sizes.stream().findFirst().get(), sizes); + List integerStream = sizes.stream().filter(x -> x < (int) options.getBatchSize()).collect(Collectors.toList()); + if (integerStream.size() > 1) { + log.warn("More than one batch isn't target size: {}. Input number of batches: {}", integerStream, batches.size()); } } + + // submit + for (var batch : batches) { + submitWorkToPoolInner(usersFunction, callback, batch); + } } } - private void submitWorkToPoolInner(final Function>, List> usersFunction, + private void submitWorkToPoolInner(final Function, List> usersFunction, final Consumer callback, final List> batch) { // for each record, construct dispatch to the executor and capture a Future @@ -745,7 +743,7 @@ private void submitWorkToPoolInner(final Function> } private List>> makeBatches(List> workToProcess) { - int maxBatchSize = (int) options.getBatchSize().get(); + int maxBatchSize = options.getBatchSize(); return partition(workToProcess, maxBatchSize); } @@ -787,7 +785,7 @@ protected int calculateQuantityToRequest() { // always round up to fill batches - get however extra are needed to fill a batch if (options.isUsingBatching()) { //noinspection OptionalGetWithoutIsPresent - int batchSize = (int) options.getBatchSize().get(); + int batchSize = options.getBatchSize(); int modulo = delta % batchSize; if (modulo > 0) { int extraToFillBatch = target - modulo; @@ -806,7 +804,7 @@ protected int getTargetOutForProcessing() { protected int getQueueTargetLoaded() { //noinspection unchecked - int batch = (int) options.getBatchSize().orElse(1); + int batch = options.getBatchSize(); return getPoolLoadTarget() * dynamicExtraLoadFactor.getCurrentFactor() * batch; } @@ -1036,7 +1034,7 @@ private Duration getTimeToNextCommitCheck() { } private Duration getTimeSinceLastCheck() { - Instant now = clock.getNow(); + Instant now = clock.instant(); return Duration.between(lastCommitCheckTime, now); } @@ -1056,7 +1054,7 @@ private void updateLastCommitCheckTime() { /** * Run the supplied function. */ - protected List, R>> runUserFunction(Function>, List> usersFunction, + protected List, R>> runUserFunction(Function, List> usersFunction, Consumer callback, List> workContainerBatch) { // call the user's function @@ -1068,11 +1066,6 @@ protected List, R>> runUserFunct } log.trace("Pool received: {}", workContainerBatch); - // - List> records = workContainerBatch.stream() - .map(WorkContainer::getCr) - .collect(Collectors.toList()); - // boolean workIsStale = wm.checkIfWorkIsStale(workContainerBatch); if (workIsStale) { @@ -1081,7 +1074,8 @@ protected List, R>> runUserFunct return null; } - resultsFromUserFunction = usersFunction.apply(records); + PollContextInternal context = new PollContextInternal<>(workContainerBatch); + resultsFromUserFunction = usersFunction.apply(context); for (final WorkContainer kvWorkContainer : workContainerBatch) { onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction); @@ -1105,7 +1099,7 @@ protected List, R>> runUserFunct // handle fail log.error("Exception caught in user function running stage, registering WC as failed, returning to mailbox", e); for (var wc : workContainerBatch) { - wc.onUserFunctionFailure(); + wc.onUserFunctionFailure(e); addToMailbox(wc); // always add on error } throw e; // trow again to make the future failed diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ConsumerRecordId.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ConsumerRecordId.java new file mode 100644 index 000000000..4d0f9da02 --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ConsumerRecordId.java @@ -0,0 +1,18 @@ +package io.confluent.parallelconsumer.state; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import lombok.Value; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +/** + * Useful identifier for a {@link ConsumerRecord}. + */ +@Value +public class ConsumerRecordId { + TopicPartition tp; + long offset; +} diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java index 1fca25d0d..4e7a08d87 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionMonitor.java @@ -4,6 +4,7 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ +import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.BrokerPollSystem; @@ -21,6 +22,7 @@ import org.apache.kafka.common.TopicPartition; import pl.tlinkowski.unij.api.UniSets; +import java.time.Clock; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,6 +55,8 @@ public class PartitionMonitor implements ConsumerRebalanceListener { private final ShardManager sm; + private final ParallelConsumerOptions options; + /** * Hold the tracking state for each of our managed partitions. */ @@ -75,6 +79,8 @@ public class PartitionMonitor implements ConsumerRebalanceListener { */ private final AtomicBoolean workStateIsDirtyNeedsCommitting = new AtomicBoolean(false); + final private Clock clock; + public PartitionState getPartitionState(TopicPartition tp) { // may cause the system to wait for a rebalance to finish // by locking on partitionState, may cause the system to wait for a rebalance to finish @@ -427,7 +433,8 @@ boolean maybeRegisterNewRecordAsWork(final ConsumerRecord rec) { return false; } else { int currentPartitionEpoch = getEpoch(rec); - var wc = new WorkContainer<>(currentPartitionEpoch, rec); + //noinspection unchecked - Lombok builder getter erases generics + var wc = new WorkContainer(currentPartitionEpoch, rec, options.getRetryDelayProvider(), clock); sm.addWorkContainer(wc); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java index 5c471181b..c1a6ae65a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java @@ -4,7 +4,6 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.WallClock; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.internal.RateLimiter; @@ -48,13 +47,11 @@ public class ProcessingShard { private final PartitionMonitor pm; - private final WallClock clock; - private final RateLimiter slowWarningRateLimit = new RateLimiter(5); public boolean workIsWaitingToBeProcessed() { return entries.values().parallelStream() - .anyMatch(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork(clock)); + .anyMatch(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork()); } public void addWorkContainer(WorkContainer wc) { @@ -82,7 +79,7 @@ public Optional> getWorkForOffset(long offset) { public long getCountOfWorkAwaitingSelection() { return entries.values().parallelStream() // todo missing pm.isBlocked(topicPartition) ? - .filter(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork(clock)) + .filter(kvWorkContainer -> kvWorkContainer.isAvailableToTakeAsWork()) .count(); } @@ -102,7 +99,7 @@ ArrayList> getWorkIfAvailable(int workToGetDelta) { if (pm.couldBeTakenAsWork(workContainer)) { - if (workContainer.isAvailableToTakeAsWork(clock)) { + if (workContainer.isAvailableToTakeAsWork()) { log.trace("Taking {} as work", workContainer); workContainer.onQueueingForExecution(); workTaken.add(workContainer); @@ -143,7 +140,7 @@ private void logSlowWork(ArrayList> slowWork) { private void addToSlowWorkMaybe(ArrayList> slowWork, WorkContainer workContainer) { var msgTemplate = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}."; Duration timeInFlight = workContainer.getTimeInFlight(); - var msg = msg(msgTemplate, workContainer, workContainer.hasDelayPassed(clock), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight); + var msg = msg(msgTemplate, workContainer, workContainer.hasDelayPassed(), workContainer.isNotInFlight(), !workContainer.isUserFunctionSucceeded(), timeInFlight); Duration slowThreshold = options.getThresholdForTimeSpendInQueueWarning(); if (isGreaterThan(timeInFlight, slowThreshold)) { slowWork.add(workContainer); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java index cddf08959..e8da53748 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java @@ -5,7 +5,6 @@ */ import io.confluent.csid.utils.LoopingResumingIterator; -import io.confluent.csid.utils.WallClock; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; @@ -16,6 +15,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -44,7 +44,7 @@ public class ShardManager { private final WorkManager wm; @Getter(PRIVATE) - private final WallClock clock; + private final Clock clock; /** * Map of Object keys to Shard @@ -60,7 +60,7 @@ public class ShardManager { // todo performance: disable/remove if using partition order private final Map> processingShards = new ConcurrentHashMap<>(); - private final NavigableSet> retryQueue = new TreeSet<>(Comparator.comparing(wc -> wc.getDelayUntilRetryDue(getClock()))); + private final NavigableSet> retryQueue = new TreeSet<>(Comparator.comparing(wc -> wc.getDelayUntilRetryDue())); /** * Iteration resume point, to ensure fairness (prevent shard starvation) when we can't process messages from every @@ -93,13 +93,6 @@ Object computeShardKey(WorkContainer wc) { return computeShardKey(wc.getCr()); } - public Optional> getWorkContainerForRecord(ConsumerRecord rec) { - Object key = computeShardKey(rec); - var shard = this.processingShards.get(key); - long offset = rec.offset(); - return shard.getWorkForOffset(offset); - } - /** * @return Work ready in the processing shards, awaiting selection as work to do */ @@ -144,7 +137,7 @@ private void removeShardFor(final WorkContainer work) { public void addWorkContainer(final WorkContainer wc) { Object shardKey = computeShardKey(wc.getCr()); var shard = processingShards.computeIfAbsent(shardKey, - (ignore) -> new ProcessingShard<>(shardKey, options, wm.getPm(), clock)); + (ignore) -> new ProcessingShard<>(shardKey, options, wm.getPm())); shard.addWorkContainer(wc); } @@ -192,7 +185,7 @@ public Optional getLowestRetryTime() { // could potentially remove from queue when in flight but that's messy and performance gain would be trivial for (WorkContainer workContainer : this.retryQueue) { if (workContainer.isNotInFlight()) - return of(workContainer.getDelayUntilRetryDue(clock)); + return of(workContainer.getDelayUntilRetryDue()); } return empty(); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java index 984d69efb..4c7cc2383 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java @@ -4,8 +4,8 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.WallClock; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.RecordContext; import lombok.AccessLevel; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -14,6 +14,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.time.temporal.Temporal; @@ -21,14 +22,16 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.Future; +import java.util.function.Function; import static io.confluent.csid.utils.KafkaUtils.toTopicPartition; +import static java.util.Optional.of; @Slf4j @EqualsAndHashCode -public class WorkContainer implements Comparable { +public class WorkContainer implements Comparable> { - private final String DEFAULT_TYPE = "DEFAULT"; + static final String DEFAULT_TYPE = "DEFAULT"; /** * Assignment generation this record comes from. Used for fencing messages after partition loss, for work lingering @@ -42,15 +45,25 @@ public class WorkContainer implements Comparable { */ @Getter @Setter + // todo change to enum, remove setter - #241 private String workType; @Getter private final ConsumerRecord cr; + private final Clock clock; + + @Getter + private int numberOfFailedAttempts = 0; + @Getter - private int numberOfFailedAttempts; + private Optional lastFailedAt = Optional.empty(); - private Optional failedAt = Optional.empty(); + @Getter + private Optional succeededAt = Optional.empty(); + + @Getter + private Optional lastFailureReason; private boolean inFlight = false; @@ -69,38 +82,38 @@ public class WorkContainer implements Comparable { private Optional timeTakenAsWorkMs = Optional.empty(); - public WorkContainer(int epoch, ConsumerRecord cr) { - this.epoch = epoch; - this.cr = cr; - workType = DEFAULT_TYPE; - } + // static instance so can't access generics - but don't need them as Options class ensures type is correct + private static Function retryDelayProvider; + + public WorkContainer(int epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, String workType, Clock clock) { + Objects.requireNonNull(workType); - public WorkContainer(int epoch, ConsumerRecord cr, String workType) { this.epoch = epoch; this.cr = cr; - Objects.requireNonNull(workType); this.workType = workType; + this.clock = clock; + + if (WorkContainer.retryDelayProvider == null) { // only set once + // static instance so can't access generics - but don't need them as Options class ensures type is correct + WorkContainer.retryDelayProvider = (Function) retryDelayProvider; + } } - public void fail(WallClock clock) { - // If not explicitly retriable, put it back in with a retry counter, so it can be later given up on - log.trace("Failing {}", this); - numberOfFailedAttempts++; - failedAt = Optional.of(clock.getNow()); - inFlight = false; + public WorkContainer(int epoch, ConsumerRecord cr, Function, Duration> retryDelayProvider, Clock clock) { + this(epoch, cr, retryDelayProvider, DEFAULT_TYPE, clock); } - public void succeed() { - log.trace("Succeeded {}", this); + public void endFlight() { + log.trace("Ending flight {}", this); inFlight = false; } - public boolean hasDelayPassed(WallClock clock) { + public boolean hasDelayPassed() { if (!hasPreviouslyFailed()) { // if never failed, there is no artificial delay, so "delay" has always passed return true; } - Duration delay = getDelayUntilRetryDue(clock); + Duration delay = getDelayUntilRetryDue(); boolean negative = delay.isNegative() || delay.isZero(); // for debug return negative; } @@ -108,17 +121,17 @@ public boolean hasDelayPassed(WallClock clock) { /** * @return time until it should be retried */ - public Duration getDelayUntilRetryDue(WallClock clock) { - Instant now = clock.getNow(); + public Duration getDelayUntilRetryDue() { + Instant now = clock.instant(); Temporal nextAttemptAt = tryAgainAt(); return Duration.between(now, nextAttemptAt); } private Temporal tryAgainAt() { - if (failedAt.isPresent()) { + if (lastFailedAt.isPresent()) { // previously failed, so add the delay to the last failed time Duration retryDelay = getRetryDelayConfig(); - return failedAt.get().plus(retryDelay); + return lastFailedAt.get().plus(retryDelay); } else { // never failed, so no try again delay return Instant.now(); @@ -129,7 +142,6 @@ private Temporal tryAgainAt() { * @return the delay between retries e.g. retry after 1 second */ public Duration getRetryDelayConfig() { - var retryDelayProvider = ParallelConsumerOptions.retryDelayProviderStatic; if (retryDelayProvider != null) { return retryDelayProvider.apply(this); } else { @@ -146,7 +158,7 @@ public int compareTo(WorkContainer o) { } public boolean isNotInFlight() { - return !inFlight; + return !isInFlight(); } public boolean isInFlight() { @@ -156,7 +168,7 @@ public boolean isInFlight() { public void onQueueingForExecution() { log.trace("Queueing for execution: {}", this); inFlight = true; - timeTakenAsWorkMs = Optional.of(System.currentTimeMillis()); + timeTakenAsWorkMs = of(System.currentTimeMillis()); } public TopicPartition getTopicPartition() { @@ -164,11 +176,22 @@ public TopicPartition getTopicPartition() { } public void onUserFunctionSuccess() { - this.userFunctionSucceeded = Optional.of(true); + this.succeededAt = of(clock.instant()); + this.userFunctionSucceeded = of(true); } - public void onUserFunctionFailure() { - this.userFunctionSucceeded = Optional.of(false); + public void onUserFunctionFailure(Throwable cause) { + log.trace("Failing {}", this); + + updateFailureHistory(cause); + + this.userFunctionSucceeded = of(false); + } + + private void updateFailureHistory(Throwable cause) { + numberOfFailedAttempts++; + lastFailedAt = of(Instant.now(clock)); + lastFailureReason = Optional.ofNullable(cause); } public boolean isUserFunctionComplete() { @@ -201,9 +224,9 @@ public boolean hasPreviouslyFailed() { return getNumberOfFailedAttempts() > 0; } - public boolean isAvailableToTakeAsWork(WallClock clock) { + public boolean isAvailableToTakeAsWork() { // todo missing boolean notAllowedMoreRecords = pm.isBlocked(topicPartition); - return isNotInFlight() && !isUserFunctionSucceeded() && hasDelayPassed(clock); + return isNotInFlight() && !isUserFunctionSucceeded() && hasDelayPassed(); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkMailBoxManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkMailBoxManager.java index bcbee45e0..8e9e04a9c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkMailBoxManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkMailBoxManager.java @@ -62,7 +62,7 @@ Integer getAmountOfWorkQueuedWaitingIngestion() { *

* Thread safe for use by control and broker poller thread. * - * @see WorkManager#onSuccess + * @see WorkManager#onSuccessResult * @see WorkManager#raisePartitionHighWaterMark */ public void registerWork(final ConsumerRecords records) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java index 5b4a23713..71c9f71ba 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java @@ -4,7 +4,7 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.WallClock; +import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.BrokerPollSystem; @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import pl.tlinkowski.unij.api.UniLists; +import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.function.Consumer; @@ -74,26 +75,24 @@ public class WorkManager implements ConsumerRebalanceListener { @Getter(PUBLIC) private final List>> successfulWorkListeners = new ArrayList<>(); - private final WallClock clock; - public WorkManager(ParallelConsumerOptions options, org.apache.kafka.clients.consumer.Consumer consumer) { - this(options, consumer, new DynamicLoadFactor(), new WallClock()); + this(options, consumer, new DynamicLoadFactor(), TimeUtils.getClock()); } /** * Use a private {@link DynamicLoadFactor}, useful for testing. */ - public WorkManager(ParallelConsumerOptions options, org.apache.kafka.clients.consumer.Consumer consumer, WallClock clock) { + public WorkManager(ParallelConsumerOptions options, org.apache.kafka.clients.consumer.Consumer consumer, Clock clock) { this(options, consumer, new DynamicLoadFactor(), clock); } - public WorkManager(final ParallelConsumerOptions newOptions, final org.apache.kafka.clients.consumer.Consumer consumer, final DynamicLoadFactor dynamicExtraLoadFactor, WallClock clock) { + public WorkManager(final ParallelConsumerOptions newOptions, final org.apache.kafka.clients.consumer.Consumer consumer, + final DynamicLoadFactor dynamicExtraLoadFactor, Clock clock) { this.options = newOptions; this.dynamicLoadFactor = dynamicExtraLoadFactor; this.wmbm = new WorkMailBoxManager<>(); this.sm = new ShardManager<>(options, this, clock); - this.pm = new PartitionMonitor<>(consumer, sm); - this.clock = clock; + this.pm = new PartitionMonitor<>(consumer, sm, options, clock); } /** @@ -166,14 +165,13 @@ private int ingestPolledRecordsIntoQueues(long requestedMaxWorkToRetrieve) { /** * Get work with no limit on quantity, useful for testing. */ - public List> getWorkIfAvailable() { + public List> getWorkIfAvailable() { return getWorkIfAvailable(Integer.MAX_VALUE); } /** * Depth first work retrieval. */ - // todo refactor - move into it's own class perhaps public List> getWorkIfAvailable(final int requestedMaxWorkToRetrieve) { // optimise early if (requestedMaxWorkToRetrieve < 1) { @@ -213,11 +211,10 @@ int tryToEnsureQuantityOfWorkQueuedAvailable(final int requestedMaxWorkToRetriev return ingested; } - // todo move PM or SM? - public void onSuccess(WorkContainer wc) { + public void onSuccessResult(WorkContainer wc) { log.trace("Work success ({}), removing from processing shard queue", wc); - wc.succeed(); + wc.endFlight(); // update as we go pm.onSuccess(wc); @@ -238,9 +235,9 @@ public void onOffsetCommitSuccess(Map committ pm.onOffsetCommitSuccess(committed); } - public void onFailure(WorkContainer wc) { + public void onFailureResult(WorkContainer wc) { // error occurred, put it back in the queue if it can be retried - wc.fail(clock); + wc.endFlight(); sm.onFailure(wc); numberRecordsOutForProcessing--; } @@ -345,9 +342,9 @@ public void handleFutureResult(WorkContainer wc) { Optional userFunctionSucceeded = wc.getUserFunctionSucceeded(); if (userFunctionSucceeded.isPresent()) { if (TRUE.equals(userFunctionSucceeded.get())) { - onSuccess(wc); + onSuccessResult(wc); } else { - onFailure(wc); + onFailureResult(wc); } } else { throw new IllegalStateException("Work returned, but without a success flag - report a bug"); @@ -359,12 +356,6 @@ public boolean isNoRecordsOutForProcessing() { return getNumberRecordsOutForProcessing() == 0; } - // todo replace raw ConsumerRecord with read only context object wrapper #216 - public Optional> getWorkContainerFor(ConsumerRecord rec) { - ShardManager shard = getSm(); - return shard.getWorkContainerForRecord(rec); - } - public Optional getLowestRetryTime() { return sm.getLowestRetryTime(); } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.java index 2f3eb9d95..c67f1ee3c 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CloseAndOpenOffsetTest.java @@ -134,7 +134,7 @@ void offsetsOpenClose(OffsetEncoding encoding) { log.info("Throwing fake error for message 2"); throw new FakeRuntimeError("Fake error - Message 2"); } - successfullInOne.add(x); + successfullInOne.add(x.getSingleConsumerRecord()); }); // wait for initial 0 commit @@ -193,7 +193,7 @@ void offsetsOpenClose(OffsetEncoding encoding) { var processedByThree = new ConcurrentLinkedQueue>(); asyncThree.poll(x -> { log.info("Read by consumer THREE: {}", x.value()); - processedByThree.add(x); + processedByThree.add(x.getSingleConsumerRecord()); }); // @@ -263,7 +263,7 @@ void correctOffsetVerySimple() { var readByOne = new ArrayList>(); asyncOne.poll(msg -> { log.debug("Reading {}", msg); - readByOne.add(msg); + readByOne.add(msg.getSingleConsumerRecord()); }); // the single message is processed @@ -290,7 +290,7 @@ void correctOffsetVerySimple() { var readByThree = new ArrayList>(); asyncThree.poll(x -> { log.info("Three read: {}", x.value()); - readByThree.add(x); + readByThree.add(x.getSingleConsumerRecord()); }); // for at least normalTimeout, nothing should be read back (will be long enough to be sure it never is) diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java index 75ca4fd9c..167828bc2 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/LargeVolumeInMemoryTests.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.integrationTests; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.csid.utils.ProgressBarUtils; @@ -218,7 +218,7 @@ private void testTiming(int numberOfKeys, int quantityOfMessagesToProduce) { Queue> processingCheck = new ConcurrentLinkedQueue>(); parallelConsumer.pollAndProduceMany((rec) -> { - processingCheck.add(rec); + processingCheck.add(rec.getSingleConsumerRecord()); ThreadUtils.sleepQuietly(3); ProducerRecord stub = new ProducerRecord<>(OUTPUT_TOPIC, "sk:" + rec.key(), "SourceV: " + rec.value()); bar.stepTo(producerSpy.history().size()); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java index 5bc4d92d1..a0876227a 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceHighVolumeTest.java @@ -118,7 +118,7 @@ private ProgressBar run(final int expectedMessageCount, final ParallelEoSStreamP pc.setMyId(Optional.of("id: " + barId)); barId++; pc.poll(record -> { - processRecord(bar, record, consumed); + processRecord(bar, record.getSingleConsumerRecord(), consumed); } // , consumeProduceResult -> { // callBack(consumeProduceResult); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java index 0e7b0e4e3..da3855a6c 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/MultiInstanceRebalanceTest.java @@ -27,7 +27,6 @@ import org.assertj.core.internal.StandardComparisonStrategy; import org.awaitility.Awaitility; import org.awaitility.core.TerminalFailureException; -import org.eclipse.jetty.util.ConcurrentHashSet; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -35,10 +34,7 @@ import org.slf4j.MDC; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -107,7 +103,7 @@ void largeNumberOfInstances() { } ProgressBar overallProgress; - Set overallConsumedKeys = new ConcurrentHashSet<>(); + Set overallConsumedKeys = new ConcurrentSkipListSet<>(); @SneakyThrows private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, int expectedMessageCount, @@ -121,7 +117,7 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, var sendingProgress = ProgressBarUtils.getNewMessagesBar("sending", log, expectedMessageCount); // pre-produce messages to input-topic - Set expectedKeys = new ConcurrentHashSet<>(); + Set expectedKeys = new ConcurrentSkipListSet<>(); log.info("Producing {} messages before starting test", expectedMessageCount); List> sends = new ArrayList<>(); int preProduceCount = (int) (expectedMessageCount * fractionOfMessagesToPreProduce); @@ -240,16 +236,16 @@ public void run() { pcExecutor.submit(chaosMonkey); - // wait for all pre-produced messages to be processed and produced + // wait for all pre-produced messages to be processed Assertions.useRepresentation(new TrimListRepresentation()); var failureMessage = msg("All keys sent to input-topic should be processed, within time (expected: {} commit: {} order: {} max poll: {})", expectedMessageCount, commitMode, order, maxPoll); ProgressTracker progressTracker = new ProgressTracker(count); try { waitAtMost(ofMinutes(5)) - // dynamic reason support still waiting https://github.com/awaitility/awaitility/pull/193#issuecomment-873116199 - // .failFast( () -> pc1.getFailureCause(), () -> pc1.isClosedOrFailed()) // requires https://github.com/awaitility/awaitility/issues/178#issuecomment-734769761 - .failFast("A PC has died - check logs", () -> !noneHaveFailed(allPCRunners)) // requires https://github.com/awaitility/awaitility/issues/178#issuecomment-734769761 + // dynamic reason support still waiting https://github.com/awaitility/awaitility/issues/240 + // .failFast( () -> pc1.getFailureCause(), () -> pc1.isClosedOrFailed()) // requires https://github.com/awaitility/awaitility/issues/240 + .failFast("A PC has died - check logs", () -> !noneHaveFailed(allPCRunners)) // dynamic reason requires https://github.com/awaitility/awaitility/issues/240 .alias(failureMessage) .pollInterval(1, SECONDS) .untilAsserted(() -> { @@ -268,6 +264,7 @@ public void run() { all.assertAll(); }); } catch (Throwable error) { + // this should be replaceable with dynamic reason generation: https://github.com/awaitility/awaitility/issues/240 List exceptions = checkForFailure(allPCRunners); if (error instanceof TerminalFailureException) { Optional any = exceptions.stream().findAny(); diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStub.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStub.java deleted file mode 100644 index 2a605a54c..000000000 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStub.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.confluent.parallelconsumer.integrationTests.datagen; - -/*- - * Copyright (C) 2020-2021 Confluent, Inc. - */ - -import io.confluent.csid.utils.AdvancingWallClockProvider; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - -import java.time.Instant; - -@RequiredArgsConstructor -public class WallClockStub extends AdvancingWallClockProvider { - - @NonNull - private final Instant baseTime; - - @Override - public Instant getNow() { - return baseTime; - } - -} diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStubTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStubTest.java deleted file mode 100644 index bbdc1f7db..000000000 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/datagen/WallClockStubTest.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.confluent.parallelconsumer.integrationTests.datagen; - -/*- - * Copyright (C) 2020-2021 Confluent, Inc. - */ - -import io.confluent.parallelconsumer.integrationTests.GenUtils; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.time.Instant; - -import static java.time.Duration.ofMillis; -import static org.assertj.core.api.Assertions.assertThat; - -@Slf4j -public class WallClockStubTest { - - @Test - void test() { - WallClockStub clock = new WallClockStub(GenUtils.randomSeedInstant); - assertThat(clock.getNow()).isEqualTo(GenUtils.randomSeedInstant); - Duration time = ofMillis(1); - Instant nowAndAdvance = clock.advanceAndGet(time); - assertThat(nowAndAdvance).isEqualTo(GenUtils.randomSeedInstant.plus(time)); - } -} diff --git a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/AdvancingWallClockProvider.java b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/AdvancingWallClockProvider.java deleted file mode 100644 index d463b7834..000000000 --- a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/AdvancingWallClockProvider.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.confluent.csid.utils; - -/*- - * Copyright (C) 2020 Confluent, Inc. - */ - -import java.time.Duration; -import java.time.Instant; - -public class AdvancingWallClockProvider extends WallClock { - - public Instant advanceAndGet(Duration time) { - return getNow().plus(time); - } - -} diff --git a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java index a5e4ed8b4..4d3e0f423 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java +++ b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/KafkaTestUtils.java @@ -274,7 +274,7 @@ public static void completeWork(final WorkManager wmm, final Wor assertThat(future).isDone(); wc.setFuture(future); wc.onUserFunctionSuccess(); - wmm.onSuccess(wc); + wmm.onSuccessResult(wc); assertThat(wc.isUserFunctionComplete()).isTrue(); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java index 6c3e0f03c..8ba88c8c5 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/BatchTestMethods.java @@ -8,20 +8,16 @@ import io.confluent.csid.utils.ProgressBarUtils; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.RateLimiter; -import io.confluent.parallelconsumer.state.WorkContainer; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import me.tongfei.progressbar.ProgressBar; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase.defaultTimeout; @@ -60,10 +56,6 @@ protected void setupParallelConsumer(int targetBatchSize, int maxConcurrency, Pa protected abstract AbstractParallelEoSStreamProcessor getPC(); - public List toOffsets(final List> x) { - return x.stream().map(ConsumerRecord::offset).collect(Collectors.toList()); - } - public void averageBatchSizeTest(int numRecsExpected) { final int targetBatchSize = 20; int maxConcurrency = 8; @@ -110,8 +102,8 @@ public void averageBatchSizeTest(int numRecsExpected) { */ protected abstract void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger); - protected POLL_RETURN averageBatchSizeTestPollInner(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger, List> pollBatch) { - int size = pollBatch.size(); + protected POLL_RETURN averageBatchSizeTestPollInner(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger, PollContext pollBatch) { + int size = (int) pollBatch.size(); statusLogger.performIfNotLimited(() -> { try { @@ -135,7 +127,7 @@ protected POLL_RETURN averageBatchSizeTestPollInner(AtomicInteger numBatches, At } } - protected abstract POLL_RETURN averageBatchSizeTestPollStep(List> recordList); + protected abstract POLL_RETURN averageBatchSizeTestPollStep(PollContext recordList); private double calcAverage(AtomicInteger numRecords, AtomicInteger numBatches) { return numRecords.get() / (0.0 + numBatches.get()); @@ -152,7 +144,7 @@ public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) { setupParallelConsumer(batchSizeSetting, ParallelConsumerOptions.DEFAULT_MAX_CONCURRENCY, order); var recs = getKtu().sendRecords(numRecsExpected); - List>> batchesReceived = new CopyOnWriteArrayList<>(); + List> batchesReceived = new CopyOnWriteArrayList<>(); // simpleBatchTestPoll(batchesReceived); @@ -172,7 +164,7 @@ public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) { .as("batch size") .allSatisfy(receivedBatchEntry -> assertThat(receivedBatchEntry).hasSizeLessThanOrEqualTo(batchSizeSetting)) .as("all messages processed") - .flatExtracting(x -> x).hasSameElementsAs(recs); + .flatExtracting(PollContext::getConsumerRecordsFlattened).hasSameElementsAs(recs); assertThat(getPC().isClosedOrFailed()).isFalse(); @@ -180,7 +172,7 @@ public void simpleBatchTest(ParallelConsumerOptions.ProcessingOrder order) { getPC().closeDrainFirst(); } - public abstract void simpleBatchTestPoll(List>> batchesReceived); + public abstract void simpleBatchTestPoll(List> batchesReceived); @SneakyThrows public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) { @@ -190,7 +182,7 @@ public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) { setupParallelConsumer(batchSize, ParallelConsumerOptions.DEFAULT_MAX_CONCURRENCY, order); var recs = getKtu().sendRecords(expectedNumOfMessages); - List>> receivedBatches = Collections.synchronizedList(new ArrayList<>()); + List> receivedBatches = Collections.synchronizedList(new ArrayList<>()); // batchFailPoll(receivedBatches); @@ -209,27 +201,25 @@ public void batchFailureTest(ParallelConsumerOptions.ProcessingOrder order) { .allSatisfy(receivedBatch -> assertThat(receivedBatch).hasSizeLessThanOrEqualTo(batchSize)) .as("all messages processed") - .flatExtracting(x -> x).hasSameElementsAs(recs); + .flatExtracting(PollContext::getConsumerRecordsFlattened).hasSameElementsAs(recs); // assertThat(getPC().isClosedOrFailed()).isFalse(); } /** - * Must call {@link #batchFailPollInner(List)} + * Must call {@link #batchFailPollInner} */ - protected abstract void batchFailPoll(List>> receivedBatches); + protected abstract void batchFailPoll(List> receivedBatches); - protected POLL_RETURN batchFailPollInner(List> pollBatch) { - List offsets = pollBatch.stream().map(ConsumerRecord::offset).collect(Collectors.toList()); + protected POLL_RETURN batchFailPollInner(PollContext pollBatch) { + List offsets = pollBatch.getOffsetsFlattened(); log.debug("Got batch {}", offsets); boolean contains = offsets.contains(FAILURE_TARGET); if (contains) { var target = pollBatch.stream().filter(x -> x.offset() == FAILURE_TARGET).findFirst().get(); - Optional workContainerFor = getPC().getWm().getWorkContainerFor(target); - WorkContainer targetWc = workContainerFor.get(); - int numberOfFailedAttempts = targetWc.getNumberOfFailedAttempts(); + int numberOfFailedAttempts = target.getNumberOfFailedAttempts(); int targetAttempts = 3; if (numberOfFailedAttempts < targetAttempts) { log.debug("Failing batch containing target offset {}", FAILURE_TARGET); @@ -242,4 +232,4 @@ protected POLL_RETURN batchFailPollInner(List> po return null; } -} \ No newline at end of file +} diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/CoreBatchTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/CoreBatchTest.java index 1b6c0361b..d181334cd 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/CoreBatchTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/CoreBatchTest.java @@ -9,7 +9,6 @@ import io.confluent.parallelconsumer.internal.RateLimiter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -37,7 +36,7 @@ protected KafkaTestUtils getKtu() { @SneakyThrows @Override - protected Void averageBatchSizeTestPollStep(List> recordList) { + protected Void averageBatchSizeTestPollStep(PollContext recordList) { try { Thread.sleep(30); } catch (InterruptedException e) { @@ -49,7 +48,7 @@ protected Void averageBatchSizeTestPollStep(List> @Override protected void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger) { - parallelConsumer.pollBatch(pollBatch -> { + parallelConsumer.poll(pollBatch -> { averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, pollBatch); }); } @@ -60,16 +59,16 @@ protected AbstractParallelEoSStreamProcessor getPC() { } @Override - public void simpleBatchTestPoll(List>> batchesReceived) { - parallelConsumer.pollBatch(pollBatch -> { - log.debug("Batch of messages: {}", toOffsets(pollBatch)); - batchesReceived.add(pollBatch); + public void simpleBatchTestPoll(List> batchesReceived) { + parallelConsumer.poll(context -> { + log.debug("Batch of messages: {}", context.getOffsetsFlattened()); + batchesReceived.add(context); }); } @Override - protected void batchFailPoll(List>> receivedBatches) { - parallelConsumer.pollBatch(pollBatch -> { + protected void batchFailPoll(List> receivedBatches) { + parallelConsumer.poll(pollBatch -> { batchFailPollInner(pollBatch); receivedBatches.add(pollBatch); }); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessorTest.java index 6c7423f40..6360c2f36 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessorTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/JStreamParallelEoSStreamProcessorTest.java @@ -47,7 +47,7 @@ public void testStream() { Stream> streamedResults = streaming.pollProduceAndStream((record) -> { ProducerRecord mock = mock(ProducerRecord.class); log.info("Consumed and produced record ({}), and returning a derivative result to produce to output topic: {}", record, mock); - myRecordProcessingAction.apply(record); + myRecordProcessingAction.apply(record.getSingleConsumerRecord()); latch.countDown(); return Lists.list(mock); }); @@ -71,7 +71,7 @@ public void testStream() { void testConsumeAndProduce() { var latch = new CountDownLatch(1); var stream = streaming.pollProduceAndStream((record) -> { - String apply = myRecordProcessingAction.apply(record); + String apply = myRecordProcessingAction.apply(record.getSingleConsumerRecord()); ProducerRecord result = new ProducerRecord<>(OUTPUT_TOPIC, "akey", apply); log.info("Consumed a record ({}), and returning a derivative result record to be produced: {}", record, result); List> result1 = Lists.list(result); @@ -89,7 +89,7 @@ void testConsumeAndProduce() { var myResultStream = stream.peek(x -> { if (x != null) { - ConsumerRecord left = x.getIn(); + ConsumerRecord left = x.getIn().getSingleConsumerRecord(); log.info("{}:{}:{}:{}", left.key(), left.value(), x.getOut(), x.getMeta()); } else { log.info("null"); @@ -105,8 +105,8 @@ void testConsumeAndProduce() { void testFlatMapProduce() { var latch = new CountDownLatch(1); var myResultStream = streaming.pollProduceAndStream((record) -> { - String apply1 = myRecordProcessingAction.apply(record); - String apply2 = myRecordProcessingAction.apply(record); + String apply1 = myRecordProcessingAction.apply(record.getSingleConsumerRecord()); + String apply2 = myRecordProcessingAction.apply(record.getSingleConsumerRecord()); var list = Lists.list( new ProducerRecord<>(OUTPUT_TOPIC, "key", apply1), diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java index 078bbfcb4..274520939 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java @@ -107,10 +107,10 @@ void offsetsAreNeverCommittedForMessagesStillInFlightSimplest(CommitMode commitM var startBarrierLatch = new CountDownLatch(1); // finish processing only msg 1 - parallelConsumer.poll((ignore) -> { - log.error("msg: {}", ignore); + parallelConsumer.poll((context) -> { + log.error("msg: {}", context); startBarrierLatch.countDown(); - int offset = (int) ignore.offset(); + int offset = (int) context.offset(); LatchTestUtils.awaitLatch(locks, offset); processedStates.put(offset, true); }); @@ -197,8 +197,8 @@ void offsetsAreNeverCommittedForMessagesStillInFlightLong(CommitMode commitMode) CountDownLatch startLatch = new CountDownLatch(1); - parallelConsumer.poll((ignore) -> { - int offset = (int) ignore.offset(); + parallelConsumer.poll((context) -> { + int offset = (int) context.offset(); CountDownLatch latchForMsg = locks.get(offset); try { startLatch.countDown(); @@ -390,7 +390,7 @@ public void testVoidPollMethod(CommitMode commitMode) { int expected = 1; var msgCompleteBarrier = new CountDownLatch(expected); parallelConsumer.poll((record) -> { - myRecordProcessingAction.apply(record); + myRecordProcessingAction.apply(record.getSingleConsumerRecord()); msgCompleteBarrier.countDown(); }); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java index 50c961afe..0f38cdb9d 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/WorkManagerTest.java @@ -5,10 +5,12 @@ */ import com.google.common.truth.Truth; -import io.confluent.csid.utils.AdvancingWallClockProvider; import io.confluent.csid.utils.KafkaTestUtils; import io.confluent.csid.utils.LongPollingMockConsumer; +import io.confluent.csid.utils.TimeUtils; +import io.confluent.parallelconsumer.FakeRuntimeError; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.truth.CommitHistorySubject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -24,10 +26,10 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; +import org.threeten.extra.MutableClock; import pl.tlinkowski.unij.api.UniLists; import java.time.Duration; -import java.time.Instant; import java.util.*; import static io.confluent.csid.utils.Range.range; @@ -50,14 +52,7 @@ class WorkManagerTest { int offset; - Instant time = Instant.now(); - - AdvancingWallClockProvider testClock = new AdvancingWallClockProvider() { - @Override - public Instant getNow() { - return time; - } - }; + MutableClock time = MutableClock.epochUTC(); @BeforeEach public void setup() { @@ -69,7 +64,7 @@ public void setup() { private void setupWorkManager(ParallelConsumerOptions build) { offset = 0; - wm = new WorkManager<>(build, new MockConsumer<>(OffsetResetStrategy.EARLIEST), testClock); + wm = new WorkManager<>(build, new MockConsumer<>(OffsetResetStrategy.EARLIEST), time); wm.getSuccessfulWorkListeners().add((work) -> { log.debug("Heard some successful work: {}", work); successfulWork.add(work); @@ -113,7 +108,7 @@ void testRemovedUnordered() { assertThat(gottenWork).hasSize(1); assertOffsets(gottenWork, of(0)); - wm.onSuccess(gottenWork.get(0)); + wm.onSuccessResult(gottenWork.get(0)); gottenWork = wm.getWorkIfAvailable(max); assertThat(gottenWork).hasSize(1); @@ -122,53 +117,84 @@ void testRemovedUnordered() { @Test void testUnorderedAndDelayed() { - setupWorkManager(ParallelConsumerOptions.builder().ordering(UNORDERED).build()); + setupWorkManager(ParallelConsumerOptions.builder() + .ordering(UNORDERED) + .build()); registerSomeWork(); int max = 2; - var works = wm.getWorkIfAvailable(max); - assertThat(works).hasSize(2); - assertOffsets(works, of(0, 1)); + { + var workRetrieved = wm.getWorkIfAvailable(max); + assertThat(workRetrieved).hasSize(2); + assertOffsets(workRetrieved, of(0, 1)); - wm.onSuccess(works.get(0)); - wm.onFailure(works.get(1)); + // pass first, fail second + WorkContainer succeed = workRetrieved.get(0); + succeed(succeed); + WorkContainer fail = workRetrieved.get(1); + fail(fail); + } - works = wm.getWorkIfAvailable(max); - assertOffsets(works, of(2)); + { + var workRetrieved = wm.getWorkIfAvailable(max); + assertOffsets(workRetrieved, of(2), + "no order restriction, 1's delay won't have passed - should get remaining in queue not yet failed"); - wm.onSuccess(works.get(0)); + WorkContainer succeed = workRetrieved.get(0); + succeed(succeed); + } - works = wm.getWorkIfAvailable(max); - assertOffsets(works, of()); + { + var workRetrieved = wm.getWorkIfAvailable(max); + assertOffsets(workRetrieved, of(), "delay won't have passed so should not retrieve anything"); - advanceClockBySlightlyLessThanDelay(); + advanceClockBySlightlyLessThanDelay(); + } - works = wm.getWorkIfAvailable(max); - assertOffsets(works, of()); + { + var workRetrieved = wm.getWorkIfAvailable(max); + assertOffsets(workRetrieved, of()); - advanceClockByDelay(); + advanceClockByDelay(); + } - works = wm.getWorkIfAvailable(max); - assertOffsets(works, of(1)); - wm.onSuccess(works.get(0)); + { + var workRetrieved = wm.getWorkIfAvailable(max); + assertOffsets(workRetrieved, of(1), + "should retrieve 1 given clock has been advanced and retry delay should be over"); + WorkContainer succeed = workRetrieved.get(0); + succeed(succeed); + } assertThat(successfulWork) .extracting(x -> (int) x.getCr().offset()) .isEqualTo(of(0, 2, 1)); } + private void succeed(WorkContainer succeed) { + succeed.onUserFunctionSuccess(); + wm.onSuccessResult(succeed); + } + /** * Checks the offsets of the work, matches the offsets in the provided list + * + * @deprecated use {@link CommitHistorySubject} or similar instead */ private AbstractListAssert, Integer, ObjectAssert> - assertOffsets(List> works, List expected) { + assertOffsets(List> works, List expected, String msg) { return assertThat(works) - .as("offsets of work given") + .as(msg) .extracting(x -> (int) x.getCr().offset()) .isEqualTo(expected); } + private AbstractListAssert, Integer, ObjectAssert> + assertOffsets(List> works, List expected) { + return assertOffsets(works, expected, "offsets of work given"); + } + @Test public void testOrderedInFlightShouldBlockQueue() { ParallelConsumerOptions build = ParallelConsumerOptions.builder().ordering(PARTITION).build(); @@ -187,7 +213,7 @@ public void testOrderedInFlightShouldBlockQueue() { works = wm.getWorkIfAvailable(max); assertOffsets(works, of()); // should be blocked by in flight - wm.onSuccess(w); + succeed(w); works = wm.getWorkIfAvailable(max); assertOffsets(works, of(1)); @@ -214,7 +240,7 @@ void testOrderedAndDelayed() { // fail the work var wc = works.get(0); - wm.onFailure(wc); + fail(wc); // nothing available to get works = wm.getWorkIfAvailable(maxWorkToGet); @@ -228,7 +254,7 @@ void testOrderedAndDelayed() { assertOffsets(works, of(0)); wc = works.get(0); - wm.onFailure(wc); + fail(wc); advanceClock(wc.getRetryDelayConfig().minus(ofSeconds(1))); @@ -240,17 +266,17 @@ void testOrderedAndDelayed() { works = wm.getWorkIfAvailable(maxWorkToGet); assertOffsets(works, of(0)); - wm.onSuccess(works.get(0)); + succeed(works.get(0)); assertOffsets(successfulWork, of(0)); works = wm.getWorkIfAvailable(maxWorkToGet); assertOffsets(works, of(1)); - wm.onSuccess(works.get(0)); + succeed(works.get(0)); works = wm.getWorkIfAvailable(maxWorkToGet); assertOffsets(works, of(2)); - wm.onSuccess(works.get(0)); + succeed(works.get(0)); // check all published in the end assertOffsets(successfulWork, of(0, 1, 2)); @@ -258,29 +284,29 @@ void testOrderedAndDelayed() { @Test void containerDelay() { - var wc = new WorkContainer(0, null); - assertThat(wc.hasDelayPassed(testClock)).isTrue(); // when new, there's no delay - wc.fail(testClock); - assertThat(wc.hasDelayPassed(testClock)).isFalse(); + var wc = new WorkContainer(0, null, null, WorkContainer.DEFAULT_TYPE, this.time); + assertThat(wc.hasDelayPassed()).isTrue(); // when new, there's no delay + wc.onUserFunctionFailure(new FakeRuntimeError("")); + assertThat(wc.hasDelayPassed()).isFalse(); advanceClockBySlightlyLessThanDelay(); - assertThat(wc.hasDelayPassed(testClock)).isFalse(); + assertThat(wc.hasDelayPassed()).isFalse(); advanceClockByDelay(); - boolean actual = wc.hasDelayPassed(testClock); + boolean actual = wc.hasDelayPassed(); assertThat(actual).isTrue(); } private void advanceClockBySlightlyLessThanDelay() { Duration retryDelay = WorkContainer.defaultRetryDelay; Duration duration = retryDelay.dividedBy(2); - time = time.plus(duration); + time.add(duration); } private void advanceClockByDelay() { - time = time.plus(WorkContainer.defaultRetryDelay); + time.add(WorkContainer.defaultRetryDelay); } private void advanceClock(Duration by) { - time = time.plus(by); + time.add(by); } @Test @@ -312,8 +338,8 @@ void insertWrongOrderPreservesOffsetOrdering() { assertOffsets(works, of(0, 1, 2, 6)); // fail some - wm.onFailure(works.get(1)); - wm.onFailure(works.get(3)); + fail(works.get(1)); + fail(works.get(3)); // works = wm.getWorkIfAvailable(max); @@ -327,6 +353,11 @@ void insertWrongOrderPreservesOffsetOrdering() { assertOffsets(works, of(1, 6)); } + private void fail(WorkContainer wc) { + wc.onUserFunctionFailure(null); + wm.onFailureResult(wc); + } + @Test @Disabled public void maxPerPartition() { @@ -428,7 +459,7 @@ void orderedByPartitionsParallel() { private void successAll(List> works) { for (WorkContainer work : works) { - wm.onSuccess(work); + wm.onSuccessResult(work); } } @@ -524,7 +555,7 @@ void treeMapOrderingCorrect() { var treeMap = new TreeMap>(); for (ConsumerRecord record : records) { - treeMap.put(record.offset(), new WorkContainer<>(0, record)); + treeMap.put(record.offset(), new WorkContainer<>(0, record, null, TimeUtils.getClock())); } // read back, assert correct order @@ -551,8 +582,7 @@ public void workQueuesEmptyWhenAllWorkComplete() { // for (var w : work) { - w.onUserFunctionSuccess(); - wm.onSuccess(w); + succeed(w); } // diff --git a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java index fb74e5d36..4506566a0 100644 --- a/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java @@ -3,8 +3,10 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.RecordContext; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomUtils; @@ -96,9 +98,10 @@ void runPollAndProduce() { postSetup(); // tag::exampleProduce[] - parallelConsumer.pollAndProduce(record -> { - var result = processBrokerRecord(record); - return new ProducerRecord<>(outputTopic, record.key(), result.payload); + parallelConsumer.pollAndProduce(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); + var result = processBrokerRecord(consumerRecord); + return new ProducerRecord<>(outputTopic, consumerRecord.key(), result.payload); }, consumeProduceResult -> { log.debug("Message {} saved to broker at offset {}", consumeProduceResult.getOut(), @@ -108,8 +111,8 @@ void runPollAndProduce() { // end::exampleProduce[] } - private Result processBrokerRecord(ConsumerRecord record) { - return new Result("Some payload from " + record.value()); + private Result processBrokerRecord(ConsumerRecord consumerRecord) { + return new Result("Some payload from " + consumerRecord.value()); } @Value @@ -123,8 +126,8 @@ void customRetryDelay() { final int baseDelaySecond = 1; ParallelConsumerOptions.builder() - .retryDelayProvider(workContainer -> { - int numberOfFailedAttempts = workContainer.getNumberOfFailedAttempts(); + .retryDelayProvider(recordContext -> { + int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts(); long delayMillis = (long) (baseDelaySecond * Math.pow(multiplier, numberOfFailedAttempts) * 1000); return Duration.ofMillis(delayMillis); }); @@ -138,7 +141,8 @@ void maxRetries() { final int maxRetries = 10; final Map, Long> retriesCount = new ConcurrentHashMap<>(); - pc.poll(consumerRecord -> { + pc.poll(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); Long retryCount = retriesCount.computeIfAbsent(consumerRecord, ignore -> 0L); if (retryCount < maxRetries) { processRecord(consumerRecord); @@ -162,7 +166,8 @@ void circuitBreaker() { // tag::circuitBreaker[] final Map upMap = new ConcurrentHashMap<>(); - pc.poll(consumerRecord -> { + pc.poll(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); String serverId = extractServerId(consumerRecord); boolean up = upMap.computeIfAbsent(serverId, ignore -> true); @@ -204,10 +209,10 @@ void batching() { .maxConcurrency(100) .batchSize(5) // <1> .build()); - parallelConsumer.pollBatch(batchOfRecords -> { + parallelConsumer.poll(context -> { // convert the batch into the payload for our processing - List payload = batchOfRecords.stream() - .map(this::pareparePayload) + List payload = context.stream() + .map(this::preparePayload) .collect(Collectors.toList()); // process the entire batch payload at once processBatchPayload(payload); @@ -215,12 +220,14 @@ void batching() { // end::batching[] } - private void processBatchPayload(List payload) { - + private void processBatchPayload(List batchPayload) { + // example } - private String pareparePayload(ConsumerRecord x) { - return null; + private String preparePayload(RecordContext rc) { + ConsumerRecord consumerRecords = rc.getConsumerRecord(); + int failureCount = rc.getNumberOfFailedAttempts(); + return msg("{}, {}", consumerRecords, failureCount); } } diff --git a/parallel-consumer-examples/parallel-consumer-example-reactor/src/main/java/io/confluent/parallelconsumer/examples/reactor/ReactorApp.java b/parallel-consumer-examples/parallel-consumer-example-reactor/src/main/java/io/confluent/parallelconsumer/examples/reactor/ReactorApp.java index 039558920..cf04949a5 100644 --- a/parallel-consumer-examples/parallel-consumer-example-reactor/src/main/java/io/confluent/parallelconsumer/examples/reactor/ReactorApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-reactor/src/main/java/io/confluent/parallelconsumer/examples/reactor/ReactorApp.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.examples.reactor; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.parallelconsumer.ParallelConsumerOptions; @@ -53,9 +53,10 @@ void run() { int port = getPort(); // tag::example[] - parallelConsumer.react(record -> { - log.info("Concurrently constructing and returning RequestInfo from record: {}", record); - Map params = UniMaps.of("recordKey", record.key(), "payload", record.value()); + parallelConsumer.react(context -> { + var consumerRecord = context.getSingleRecord().getConsumerRecord(); + log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord); + Map params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value()); return Mono.just("something todo"); // <1> }); // end::example[] diff --git a/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java b/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java index b38dbc4c6..38a996d80 100644 --- a/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java +++ b/parallel-consumer-examples/parallel-consumer-example-vertx/src/main/java/io/confluent/parallelconsumer/examples/vertx/VertxApp.java @@ -1,7 +1,7 @@ package io.confluent.parallelconsumer.examples.vertx; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.parallelconsumer.ParallelConsumerOptions; @@ -53,9 +53,10 @@ void run() { int port = getPort(); // tag::example[] - var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> { - log.info("Concurrently constructing and returning RequestInfo from record: {}", record); - Map params = UniMaps.of("recordKey", record.key(), "payload", record.value()); + var resultStream = parallelConsumer.vertxHttpReqInfoStream(context -> { + var consumerRecord = context.getSingleConsumerRecord(); + log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord); + Map params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value()); return new RequestInfo("localhost", port, "/api", params); // <1> }); // end::example[] diff --git a/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java b/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java index 720861728..b3b34e1ed 100644 --- a/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java +++ b/parallel-consumer-reactor/src/main/java/io/confluent/parallelconsumer/reactor/ReactorProcessor.java @@ -5,7 +5,8 @@ */ import io.confluent.parallelconsumer.ParallelConsumerOptions; -import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.internal.ExternalEngine; import io.confluent.parallelconsumer.state.WorkContainer; import lombok.SneakyThrows; @@ -20,13 +21,11 @@ import java.time.Duration; import java.util.List; -import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun; /** @@ -43,7 +42,7 @@ public class ReactorProcessor extends ExternalEngine { private final Supplier schedulerSupplier; private final Supplier defaultSchedulerSupplier = Schedulers::boundedElastic; - public ReactorProcessor(ParallelConsumerOptions options, Supplier newSchedulerSupplier) { + public ReactorProcessor(ParallelConsumerOptions options, Supplier newSchedulerSupplier) { super(options); this.schedulerSupplier = (newSchedulerSupplier == null) ? defaultSchedulerSupplier : newSchedulerSupplier; } @@ -67,50 +66,35 @@ public void close(Duration timeout, DrainingMode drainMode) { } /** - * Like {@link ParallelStreamProcessor#pollBatch} but for Reactor. - *

- * Register a function to be applied to a batch of messages. + * Register a function to be to polled messages. *

* Make sure that you do any work immediately in a Publisher / Flux - do not block this thread. *

- * The system will treat the messages as a set, so if an error is thrown by the user code, then all messages will be - * marked as failed and be retried (Note that when they are retried, there is no guarantee they will all be in the - * same batch again). So if you're going to process messages individually, then don't use this function. - *

- * Otherwise, if you're going to process messages in sub sets from this batch, it's better to instead adjust the - * {@link ParallelConsumerOptions#getBatchSize()} instead to the actual desired size, and process them as a whole. * * @param reactorFunction user function that takes a single record, and returns some type of Publisher to process * their work. - * @see ParallelStreamProcessor#pollBatch - * @see ParallelConsumerOptions#getBatchSize() + * @see #react(Function) + * @see ParallelConsumerOptions + * @see ParallelConsumerOptions#batchSize + * @see io.confluent.parallelconsumer.ParallelStreamProcessor#poll */ - public void reactBatch(Function>, Publisher> reactorFunction) { - Function>, List> wrappedUserFunc = (recList) -> { + public void react(Function, Publisher> reactorFunction) { + + Function, List> wrappedUserFunc = pollContext -> { if (log.isTraceEnabled()) { log.trace("Record list ({}), executing void function...", - recList.stream() + pollContext.streamConsumerRecords() .map(ConsumerRecord::offset) .collect(Collectors.toList()) ); } - for (var rec : recList) { - // attach internal handler - - WorkContainer wc = wm.getWorkContainerFor(rec).get(); - - if (wc == null) { - // throw it - will retry - // should be fixed by moving for TreeMap to ConcurrentSkipListMap - throw new IllegalStateException(msg("WC for record is null! {}", rec)); - } else { - wc.setWorkType(REACTOR_TYPE); - } - } + // attach internal handler + pollContext.streamWorkContainers() + .forEach(x -> x.setWorkType(REACTOR_TYPE)); - Publisher publisher = carefullyRun(reactorFunction, recList); + Publisher publisher = carefullyRun(reactorFunction, pollContext.getPollContext()); Disposable flux = Flux.from(publisher) // using #subscribeOn so this should be redundant, but testing has shown otherwise @@ -123,30 +107,17 @@ public void reactBatch(Function>, Publisher> reacto }) .doOnComplete(() -> { log.debug("Reactor success (doOnComplete)"); - for (var rec : recList) { - // todo Optional here is bad smell - shouldn't happen. See ReactorPCTest#concurrencyTest, context object and shard object. Fails when our for processing exceeds MAX limit - Optional> wcOpt = wm.getWorkContainerFor(rec); - if (wcOpt.isPresent()) { - WorkContainer wc = wcOpt.get(); - wc.onUserFunctionSuccess(); - addToMailbox(wc); - } else { - log.warn("WorkContainer for rec removed already: {}", rec); - } - } + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionSuccess(); + addToMailbox(wc); + }); }) .doOnError(throwable -> { log.error("Reactor fail signal", throwable); - for (var rec : recList) { - Optional> wcOpt = wm.getWorkContainerFor(rec); - if (wcOpt.isPresent()) { - var wc = wcOpt.get(); - wc.onUserFunctionFailure(); - addToMailbox(wc); - } else { - log.warn("WorkContainer for rec removed already: {}", rec); - } - } + pollContext.streamWorkContainers().forEach(wc -> { + wc.onUserFunctionFailure(throwable); + addToMailbox(wc); + }); }) // cause users Publisher to run a thread pool, if it hasn't already - this is a crucial magical part .subscribeOn(getScheduler()) @@ -161,34 +132,6 @@ public void reactBatch(Function>, Publisher> reacto supervisorLoop(wrappedUserFunc, voidCallBack); } - /** - * Make sure that you do any work immediately in a Publisher / Flux - do not block this thread. - *

- * Like {@link #reactBatch} but no batching - single message at a time. - * - * @param reactorFunction user function that takes a single record, and returns some type of Publisher to process - * their work. - * @see #reactBatch(Function) - */ - public void react(Function, Publisher> reactorFunction) { - // wrap single record function in batch function - Function>, Publisher> batchReactorFunctionWrapper = (recordList) -> { - if (recordList.size() != 1) { - throw new IllegalArgumentException("Bug: Function only takes a single element"); - } - var consumerRecord = recordList.get(0); // will always only have one - log.trace("Consumed a record ({}), executing void function...", consumerRecord.offset()); - - Publisher publisher = carefullyRun(reactorFunction, consumerRecord); - - log.trace("asyncPoll - user function finished ok."); - return publisher; - }; - - // - reactBatch(batchReactorFunctionWrapper); - } - private Scheduler getScheduler() { return this.schedulerSupplier.get(); } diff --git a/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorBatchTest.java b/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorBatchTest.java index 40a8d1ec0..154ad7c60 100644 --- a/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorBatchTest.java +++ b/parallel-consumer-reactor/src/test/java/io/confluent/parallelconsumer/reactor/ReactorBatchTest.java @@ -8,11 +8,11 @@ import io.confluent.parallelconsumer.BatchTestBase; import io.confluent.parallelconsumer.BatchTestMethods; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.RateLimiter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -41,16 +41,16 @@ protected KafkaTestUtils getKtu() { @SneakyThrows @Override - protected Mono averageBatchSizeTestPollStep(List> recordList) { - return Mono.just(msg("Saw batch or records: {}", toOffsets(recordList))) + protected Mono averageBatchSizeTestPollStep(PollContext recordList) { + return Mono.just(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())) .delayElement(Duration.ofMillis(30)); } @Override protected void averageBatchSizeTestPoll(AtomicInteger numBatches, AtomicInteger numRecords, RateLimiter statusLogger) { - reactorPC.reactBatch(recordList -> { - return averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, recordList); - }); + reactorPC.react(recordList -> + averageBatchSizeTestPollInner(numBatches, numRecords, statusLogger, recordList) + ); } @Override @@ -59,20 +59,21 @@ protected AbstractParallelEoSStreamProcessor getPC() { } @Override - public void simpleBatchTestPoll(List>> batchesReceived) { - reactorPC.reactBatch(recordList -> { - log.debug("Batch of messages: {}", toOffsets(recordList)); + public void simpleBatchTestPoll(List> batchesReceived) { + reactorPC.react(recordList -> { + String msg = msg("Saw batch or records: {}", recordList.getOffsetsFlattened()); + log.debug(msg); batchesReceived.add(recordList); - return Mono.just(msg("Saw batch or records: {}", toOffsets(recordList))); + return Mono.just(msg); }); } @Override - protected void batchFailPoll(List>> batchesReceived) { - reactorPC.reactBatch(recordList -> { + protected void batchFailPoll(List> batchesReceived) { + reactorPC.react(recordList -> { batchFailPollInner(recordList); batchesReceived.add(recordList); - return Mono.just(msg("Saw batch or records: {}", toOffsets(recordList))); + return Mono.just(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())); }); } }; diff --git a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor.java b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor.java index d88623fd7..1d3e3ce7b 100644 --- a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor.java +++ b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelEoSStreamProcessor.java @@ -1,11 +1,12 @@ package io.confluent.parallelconsumer.vertx; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.csid.utils.Java8StreamUtils; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; @@ -15,7 +16,6 @@ import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedDeque; @@ -63,11 +63,11 @@ public JStreamVertxParallelEoSStreamProcessor(ParallelConsumerOptions opti } @Override - public Stream> vertxHttpReqInfoStream(Function, RequestInfo> requestInfoFunction) { + public Stream> vertxHttpReqInfoStream(Function, RequestInfo> requestInfoFunction) { VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder(); - Function, RequestInfo> requestInfoFunctionWrapped = x -> { + Function, RequestInfo> requestInfoFunctionWrapped = x -> { result.in(x); RequestInfo apply = carefullyRun(requestInfoFunction, x); result.requestInfo(Optional.of(apply)); @@ -88,11 +88,11 @@ public Stream> vertxHttpReqInfoStream(Function> vertxHttpRequestStream(BiFunction, HttpRequest> webClientRequestFunction) { + public Stream> vertxHttpRequestStream(BiFunction, HttpRequest> webClientRequestFunction) { VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder(); - BiFunction, HttpRequest> requestInfoFunctionWrapped = (wc, x) -> { + BiFunction, HttpRequest> requestInfoFunctionWrapped = (wc, x) -> { result.in(x); HttpRequest apply = carefullyRun(webClientRequestFunction, wc, x); @@ -114,11 +114,11 @@ public Stream> vertxHttpRequestStream(BiFunction> vertxHttpWebClientStream( - BiFunction, Future>> webClientRequestFunction) { + BiFunction, Future>> webClientRequestFunction) { VertxCPResult.VertxCPResultBuilder result = VertxCPResult.builder(); - BiFunction, Future>> wrappedFunc = (x, y) -> { + BiFunction, Future>> wrappedFunc = (x, y) -> { // capture result.in(y); Future> apply = carefullyRun(webClientRequestFunction, x, y); @@ -147,7 +147,7 @@ public Stream> vertxHttpWebClientStream( @Getter @Builder public static class VertxCPResult { - private final ConsumerRecord in; + private final PollContext in; private final Future> asr; // todo change to class generic type variables? 2 fields become 1. Not worth the hassle atm. @@ -158,4 +158,4 @@ public static class VertxCPResult { private final Optional> httpReq = Optional.empty(); } -} \ No newline at end of file +} diff --git a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelStreamProcessor.java b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelStreamProcessor.java index 316aef5c1..45e14aec3 100644 --- a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelStreamProcessor.java +++ b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/JStreamVertxParallelStreamProcessor.java @@ -1,17 +1,17 @@ package io.confluent.parallelconsumer.vertx; /*- - * Copyright (C) 2020-2021 Confluent, Inc. + * Copyright (C) 2020-2022 Confluent, Inc. */ import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.client.HttpRequest; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.function.BiFunction; import java.util.function.Function; @@ -31,7 +31,7 @@ static JStreamVertxParallelStreamProcessor createEosStreamProce * * @see VertxParallelEoSStreamProcessor#vertxHttpReqInfo */ - Stream> vertxHttpReqInfoStream(Function, + Stream> vertxHttpReqInfoStream(Function, VertxParallelEoSStreamProcessor.RequestInfo> requestInfoFunction); /** @@ -40,7 +40,7 @@ Stream> vertxHttpReqI * @see VertxParallelEoSStreamProcessor#vertxHttpRequest */ Stream> vertxHttpRequestStream(BiFunction, HttpRequest> webClientRequestFunction); + PollContext, HttpRequest> webClientRequestFunction); /** * Streaming version @@ -48,5 +48,5 @@ Stream> vertxHttpRequ * @see VertxParallelEoSStreamProcessor#vertxHttpWebClient */ Stream> vertxHttpWebClientStream( - BiFunction, Future>> webClientRequestFunction); + BiFunction, Future>> webClientRequestFunction); } diff --git a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor.java b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor.java index e67cca74c..130a6ab0a 100644 --- a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor.java +++ b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelEoSStreamProcessor.java @@ -3,7 +3,10 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; +import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.internal.ExternalEngine; import io.confluent.parallelconsumer.state.WorkContainer; import io.vertx.core.AsyncResult; @@ -20,7 +23,6 @@ import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.utils.Time; import pl.tlinkowski.unij.api.UniLists; import pl.tlinkowski.unij.api.UniMaps; @@ -123,10 +125,10 @@ protected ThreadPoolExecutor setupWorkerPool(int poolSize) { } @Override - public void vertxHttpReqInfo(Function, RequestInfo> requestInfoFunction, + public void vertxHttpReqInfo(Function, RequestInfo> requestInfoFunction, Consumer>> onSend, Consumer>> onWebRequestComplete) { - vertxHttpRequest((WebClient webClient, ConsumerRecord rec) -> { + vertxHttpRequest((WebClient webClient, PollContext rec) -> { RequestInfo reqInf = carefullyRun(requestInfoFunction, rec); HttpRequest req = webClient.get(reqInf.getPort(), reqInf.getHost(), reqInf.getContextPath()); @@ -139,7 +141,7 @@ public void vertxHttpReqInfo(Function, RequestInfo> request } @Override - public void vertxHttpRequest(BiFunction, HttpRequest> webClientRequestFunction, + public void vertxHttpRequest(BiFunction, HttpRequest> webClientRequestFunction, Consumer>> onSend, Consumer>> onWebRequestComplete) { @@ -158,23 +160,19 @@ public void vertxHttpRequest(BiFunction, HttpReq } @Override - public void vertxHttpWebClient(BiFunction, Future>> webClientRequestFunction, + public void vertxHttpWebClient(BiFunction, Future>> webClientRequestFunction, Consumer>> onWebRequestSentCallback) { // wrap single record function in batch function - Function>, List>>> userFuncWrapper = (recordList) -> { - if (recordList.size() != 1) { - throw new IllegalArgumentException("Bug: Function only takes a single element"); - } - var consumerRecord = recordList.get(0); // will always only have one - log.trace("Consumed a record ({}), executing void function...", consumerRecord.offset()); + Function, List>>> userFuncWrapper = (context) -> { + log.trace("Consumed a record ({}), executing void function...", context); - Future> futureWebResponse = carefullyRun(webClientRequestFunction, webClient, consumerRecord); + Future> futureWebResponse = carefullyRun(webClientRequestFunction, webClient, context.getPollContext()); // execute user's onSend callback onWebRequestSentCallback.accept(futureWebResponse); - addVertxHooks(consumerRecord, futureWebResponse); + addVertxHooks(context, futureWebResponse); return UniLists.of(futureWebResponse); }; @@ -185,15 +183,9 @@ public void vertxHttpWebClient(BiFunction, Futur super.supervisorLoop(userFuncWrapper, noOp); } - private void addVertxHooks(ConsumerRecord consumerRecord, Future send) { - addVertxHooks(UniLists.of(consumerRecord), send); - } - - private void addVertxHooks(final List> recordList, final Future send) { - for (var consumerRecord : recordList) { + private void addVertxHooks(final PollContextInternal context, final Future send) { + context.streamWorkContainers().forEach(wc -> { // attach internal handler - // todo - refactor: Poll Context object for API #223 - WorkContainer wc = wm.getSm().getWorkContainerForRecord(consumerRecord).get(); wc.setWorkType(VERTX_TYPE); send.onSuccess(h -> { @@ -203,7 +195,7 @@ private void addVertxHooks(final List> recordList, final Fu }); send.onFailure(h -> { log.error("Vert.x Vertical fail: {}", h.getMessage()); - wc.onUserFunctionFailure(); + wc.onUserFunctionFailure(h); addToMailbox(wc); }); @@ -212,23 +204,19 @@ private void addVertxHooks(final List> recordList, final Fu log.trace("Running plugin hook"); this.onVertxCompleteHook.ifPresent(Runnable::run); }); - } + }); } @Override - public void vertxFuture(final Function, Future> result) { + public void vertxFuture(final Function, Future> result) { // wrap single record function in batch function - Function>, List>> userFuncWrapper = recordList -> { - if (recordList.size() != 1) { - throw new IllegalArgumentException("Bug: Function only takes a single element"); - } - var consumerRecord = recordList.get(0); // will always only have one - log.trace("Consumed a record ({}), executing void function...", consumerRecord.offset()); + Function, List>> userFuncWrapper = context -> { + log.trace("Consumed a record ({}), executing void function...", context); - Future send = carefullyRun(result, consumerRecord); + Future send = carefullyRun(result, context.getPollContext()); - addVertxHooks(consumerRecord, send); + addVertxHooks(context, send); return UniLists.of(send); }; @@ -240,13 +228,13 @@ public void vertxFuture(final Function, Future> result) } @Override - public void batchVertxFuture(final Function>, Future> result) { + public void batchVertxFuture(final Function, Future> result) { - Function>, List>> userFuncWrapper = record -> { + Function, List>> userFuncWrapper = context -> { - Future send = carefullyRun(result, record); + Future send = carefullyRun(result, context.getPollContext()); - addVertxHooks(record, send); + addVertxHooks(context, send); return UniLists.of(send); }; diff --git a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelStreamProcessor.java b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelStreamProcessor.java index 6d628b78d..ea02a2a09 100644 --- a/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelStreamProcessor.java +++ b/parallel-consumer-vertx/src/main/java/io/confluent/parallelconsumer/vertx/VertxParallelStreamProcessor.java @@ -7,6 +7,7 @@ import io.confluent.parallelconsumer.ParallelConsumer; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -16,7 +17,6 @@ import io.vertx.ext.web.client.WebClient; import org.apache.kafka.clients.consumer.ConsumerRecord; -import java.util.List; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -47,7 +47,7 @@ static VertxParallelStreamProcessor createEosStreamProcessor(Pa * @param onSend function executed after the request has been sent * @param onWebRequestComplete function executed when response received for request */ - void vertxHttpReqInfo(Function, VertxParallelEoSStreamProcessor.RequestInfo> requestInfoFunction, + void vertxHttpReqInfo(Function, VertxParallelEoSStreamProcessor.RequestInfo> requestInfoFunction, Consumer>> onSend, Consumer>> onWebRequestComplete); @@ -59,7 +59,7 @@ void vertxHttpReqInfo(Function, VertxParallelEoSStreamProce * @param onSend * @param onWebRequestComplete */ - void vertxHttpRequest(BiFunction, HttpRequest> webClientRequestFunction, + void vertxHttpRequest(BiFunction, HttpRequest> webClientRequestFunction, Consumer>> onSend, Consumer>> onWebRequestComplete); @@ -74,14 +74,14 @@ void vertxHttpRequest(BiFunction, HttpRequest, Future>> webClientRequestFunction, + void vertxHttpWebClient(BiFunction, Future>> webClientRequestFunction, Consumer>> onSend); /** * Consumer from the Broker concurrently - use the various Vert.x systems to return us a vert.x Future based on this * record. */ - void vertxFuture(final Function, Future> result); + void vertxFuture(final Function, Future> result); /** * Like {@link ParallelStreamProcessor#pollBatch} but for Vert.x. @@ -98,5 +98,5 @@ void vertxHttpWebClient(BiFunction, Future>, Future> result); + void batchVertxFuture(Function, Future> result); } diff --git a/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java b/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java index 73f50b6e8..5625d419a 100644 --- a/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java +++ b/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxBatchTest.java @@ -8,6 +8,7 @@ import io.confluent.parallelconsumer.BatchTestBase; import io.confluent.parallelconsumer.BatchTestMethods; import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; import io.confluent.parallelconsumer.internal.RateLimiter; import io.vertx.core.Future; @@ -17,7 +18,6 @@ import io.vertx.junit5.VertxTestContext; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -49,13 +49,13 @@ protected KafkaTestUtils getKtu() { @SneakyThrows @Override - protected Future averageBatchSizeTestPollStep(List> recordList) { + protected Future averageBatchSizeTestPollStep(PollContext recordList) { int delayInMs = 30; Promise promise = Promise.promise(); vertx.setTimer(delayInMs, event -> { - String msg = msg("Saw batch or records: {}", toOffsets(recordList)); + String msg = msg("Saw batch or records: {}", recordList.getOffsetsFlattened()); log.debug(msg); promise.complete(msg); }); @@ -76,23 +76,23 @@ protected AbstractParallelEoSStreamProcessor getPC() { } @Override - public void simpleBatchTestPoll(List>> batchesReceived) { + public void simpleBatchTestPoll(List> batchesReceived) { vertxAsync.batchVertxFuture(recordList -> { return vertx.executeBlocking(event -> { - log.debug("Saw batch or records: {}", toOffsets(recordList)); + log.debug("Saw batch or records: {}", recordList.getOffsetsFlattened()); batchesReceived.add(recordList); - event.complete(msg("Saw batch or records: {}", toOffsets(recordList))); + event.complete(msg("Saw batch or records: {}", recordList.getOffsetsFlattened())); }); }); } @Override - protected void batchFailPoll(List>> receivedBatches) { + protected void batchFailPoll(List> receivedBatches) { vertxAsync.batchVertxFuture(pollBatch -> { receivedBatches.add(pollBatch); batchFailPollInner(pollBatch); - return Future.succeededFuture(msg("Saw batch or records: {}", toOffsets(pollBatch))); + return Future.succeededFuture(msg("Saw batch or records: {}", pollBatch.getOffsetsFlattened())); }); } }; diff --git a/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxTest.java b/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxTest.java index 4de6010b0..a862d89ec 100644 --- a/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxTest.java +++ b/parallel-consumer-vertx/src/test/java/io/confluent/parallelconsumer/vertx/VertxTest.java @@ -3,8 +3,10 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import com.github.tomakehurst.wiremock.WireMockServer; import io.confluent.csid.utils.WireMockUtils; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.vertx.VertxParallelEoSStreamProcessor.RequestInfo; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -18,7 +20,6 @@ import io.vertx.junit5.VertxTestContext; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -87,7 +88,7 @@ void failingHttpCall() { vertxAsync.addVertxOnCompleteHook(latch::countDown); var tupleStream = - vertxAsync.vertxHttpReqInfoStream((ConsumerRecord rec) -> getBadRequest()); + vertxAsync.vertxHttpReqInfoStream((PollContext rec) -> getBadRequest()); // awaitLatch(latch); diff --git a/src/docs/README.adoc b/src/docs/README.adoc index ed3968666..498166d90 100644 --- a/src/docs/README.adoc +++ b/src/docs/README.adoc @@ -233,12 +233,12 @@ without operational burden or harming the cluster's performance * Solution for the https://en.wikipedia.org/wiki/Head-of-line_blocking["head of line"] blocking problem where continued failure of a single message, prevents progress for messages behind it in the queue * Per `key` concurrent processing, per partition and unordered message processing * Offsets committed correctly, in order, of only processed messages, regardless of concurrency level or retries -* Batch version fo the API to process batches of messages in parallel instead of single messages. +* Batch support in all versions of the API to process batches of messages in parallel instead of single messages. +** Particularly useful for when your processing function can work with more than a single record at a time - e.g. sending records to an API which has a batch version like Elasticsearch * Vert.x and Reactor.io non-blocking library integration ** Non-blocking I/O work management ** Vert.x's WebClient and general Vert.x Future support ** Reactor.io Publisher (Mono/Flux) and Java's CompletableFuture (through `Mono#fromFuture`) -* Reactor non-blocking library integration * Fair partition traversal * Zero~ dependencies (`Slf4j` and `Lombok`) for the core module * Java 8 compatibility @@ -466,10 +466,12 @@ In future versions, we plan to look at supporting other streaming systems like h [[batching]] === Batching -The library also support a batch version of the API. -Using this, you can process batches of messages at once. +The library also supports sending a batch or records as input to the users processing function in parallel. +Using this, you can process several records in your function at once. -To use it, use one of the `batch` versions instead. +To use it, set a `batch size` in the options class. + +There are then various access methods for the batch of records - see the `PollContext` object for more information. IMPORTANT: If an exception is thrown while processing the batch, all messages in the batch will be returned to the queue, to be retried with the standard retry system. There is no guarantee that the messages will be retried again in the same batch. @@ -540,6 +542,14 @@ From the `Clients` view, get the connection information customized to your clust .. Use these settings presented to https://docs.confluent.io/clients-kafka-java/current/overview.html[configure your clients]. . Use these clients for steps outlined in the <> section. +[[upgrading]] +== Upgrading + +=== From 0.4 to 0.5 + +This version has a breaking change in the API - instead of passing in `ConsumerRecord` instances, it passes in a `PollContext` object which has extra information and utility methods. +See the `PollContext` class for more information. + [[ordering-guarantees]] == Ordering Guarantees