diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index b4ccacca86..3ffc0446e5 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -5098,6 +5098,11 @@ If you are using Spring Boot, you simply need to add the error handler as a `@Be This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now. One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <>. +IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking. +Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive). +The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again. +To enable this mode, set the property `seekAfterError` to `false`. + The error handler can recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the `ERROR` level). You can configure the handler with a custom recoverer (`BiConsumer`) and a `BackOff` that controls the delivery attempts and delays between each. @@ -5152,6 +5157,11 @@ The sequence of events is: The recovered record's offset is committed * If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted. +IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking. +Instead, error handler creates a new `ConsumerRecords` containing just the unprocessed records which will then be submitted to the listener (after performing a single paused `poll()`, to keep the consumer alive). +To enable this mode, set the property `seekAfterError` to `false`. + + The default recoverer logs the failed record after retries are exhausted. You can use a custom recoverer, or one provided by the framework such as the <>. @@ -5250,6 +5260,8 @@ If the function returns `null`, the handler's default `BackOff` will be used. Set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures. By default, the exception type is not considered. +Starting with version 2.9, this is now `true` by default. + Also see <>. [[batch-listener-conv-errors]] diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java index 92896e17a4..4b6eecd4b5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonContainerStoppingErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,10 +71,17 @@ public void setStopContainerAbnormally(boolean stopContainerAbnormally) { } @Override + @Deprecated public boolean remainingRecords() { return true; } + @Override + public boolean seeksAfterHandling() { + // We don't actually do any seeks here, but stopping the container has the same effect. + return true; + } + @Override public void handleOtherException(Exception thrownException, Consumer consumer, MessageListenerContainer container, boolean batchListener) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java index e0b1b224c2..b915a9253c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,11 +65,17 @@ public void setErrorHandlers(Map, CommonErrorHandler> } + @SuppressWarnings("deprecation") @Override public boolean remainingRecords() { return this.defaultErrorHandler.remainingRecords(); } + @Override + public boolean seeksAfterHandling() { + return this.defaultErrorHandler.seeksAfterHandling(); + } + @Override public void clearThreadState() { this.defaultErrorHandler.clearThreadState(); @@ -96,14 +102,18 @@ public void addDelegate(Class throwable, CommonErrorHandler checkDelegates(); } + @SuppressWarnings("deprecation") private void checkDelegates() { boolean remainingRecords = this.defaultErrorHandler.remainingRecords(); boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle(); + boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling(); this.delegates.values().forEach(handler -> { Assert.isTrue(remainingRecords == handler.remainingRecords(), "All delegates must return the same value when calling 'remainingRecords()'"); Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(), "All delegates must return the same value when calling 'isAckAfterHandle()'"); + Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(), + "All delegates must return the same value when calling 'seeksAfterHandling()'"); }); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java index 0df97647a8..6c927646e8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,13 +41,25 @@ public interface CommonErrorHandler extends DeliveryAttemptAware { * When true (default), all remaining records including the failed record are passed * to the error handler. * @return false to receive only the failed record. + * @deprecated in favor of {@link #seeksAfterHandling()}. * @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer) * @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer) */ + @Deprecated default boolean remainingRecords() { return false; } + /** + * Return true if this error handler performs seeks on the failed record and remaining + * records (or just the remaining records after a failed record is recovered). + * @return true if the next poll should fetch records. + */ + @SuppressWarnings("deprecation") + default boolean seeksAfterHandling() { + return remainingRecords(); + } + /** * Return true if this error handler supports delivery attempts headers. * @return true if capable. @@ -79,14 +91,42 @@ default void handleOtherException(Exception thrownException, Consumer cons * @param record the record. * @param consumer the consumer. * @param container the container. + * @deprecated in favor of + * {@link #handleOne(Exception, ConsumerRecord, Consumer, MessageListenerContainer)}. * @see #remainingRecords() */ + @Deprecated default void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException); } + /** + * Handle the exception for a record listener when {@link #remainingRecords()} returns + * false. Use this to handle just the single failed record. + * @param thrownException the exception. + * @param record the record. + * @param consumer the consumer. + * @param container the container. + * @return true if the error was "handled" or false if not and the container will + * re-submit the record to the listener. + * @since 2.9 + * @see #remainingRecords() + */ + @SuppressWarnings("deprecation") + default boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, + MessageListenerContainer container) { + + try { + handleRecord(thrownException, record, consumer, container); + return true; + } + catch (Exception ex) { + return false; + } + } + /** * Handle the exception for a record listener when {@link #remainingRecords()} returns * true. The failed record and all the remaining records from the poll are passed in. @@ -120,6 +160,28 @@ default void handleBatch(Exception thrownException, ConsumerRecords data, LogFactory.getLog(getClass()).error("'handleBatch' is not implemented by this handler", thrownException); } + /** + * Handle the exception for a batch listener. The complete {@link ConsumerRecords} + * from the poll is supplied. Return the members of the batch that should be re-sent to + * the listener. The returned records MUST be in the same order as the original records. + * @param thrownException the exception. + * @param data the consumer records. + * @param consumer the consumer. + * @param container the container. + * @param invokeListener a callback to re-invoke the listener. + * @param the key type. + * @param the value type. + * @return the consumer records, or a subset. + * @since 2.9 + */ + default ConsumerRecords handleBatchAndReturnRemaining(Exception thrownException, + ConsumerRecords data, Consumer consumer, MessageListenerContainer container, + Runnable invokeListener) { + + handleBatch(thrownException, data, consumer, container, invokeListener); + return ConsumerRecords.empty(); + } + @Override default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { return 0; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java index ba84ea1409..ebdc99a3b0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonLoggingErrorHandler.java @@ -48,6 +48,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) { } @Override + @Deprecated public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java index 24128917e5..5c81fb00e0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/CommonMixedErrorHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,11 +51,17 @@ public CommonMixedErrorHandler(CommonErrorHandler recordErrorHandler, CommonErro this.batchErrorHandler = batchErrorHandler; } + @SuppressWarnings("deprecation") @Override public boolean remainingRecords() { return this.recordErrorHandler.remainingRecords(); } + @Override + public boolean seeksAfterHandling() { + return this.recordErrorHandler.seeksAfterHandling(); + } + @Override public boolean deliveryAttemptHeader() { return this.recordErrorHandler.deliveryAttemptHeader(); @@ -73,10 +79,10 @@ public void handleOtherException(Exception thrownException, Consumer consu } @Override - public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer, + public boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { - this.recordErrorHandler.handleRecord(thrownException, record, consumer, container); + return this.recordErrorHandler.handleOne(thrownException, record, consumer, container); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index ab2cb0a561..32959fbf6d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -133,8 +133,8 @@ private void checkConfig() { public void process(List> records, Consumer consumer, @Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) { - if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable, - getRecoveryStrategy((List) records, exception), container, this.logger) + if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable, + getFailureTracker()::recovered, container, this.logger) && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { ConsumerRecord skipped = records.get(0); this.kafkaTemplate.sendOffsetsToTransaction( diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index 02e4dd7285..cffc8e78fa 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -117,8 +117,14 @@ public void setAckAfterHandle(boolean ackAfterHandle) { } @Override + @Deprecated public boolean remainingRecords() { - return true; + return isSeekAfterError(); + } + + @Override + public boolean seeksAfterHandling() { + return isSeekAfterError(); } @Override @@ -126,12 +132,24 @@ public boolean deliveryAttemptHeader() { return true; } + @Override + public boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, + MessageListenerContainer container) { + + try { + return getFailureTracker().recovered(record, thrownException, container, consumer); + } + catch (Exception ex) { + return false; + } + } + @Override public void handleRemaining(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR - getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel()); + getFailureTracker()::recovered, this.logger, getLogLevel()); } @Override @@ -141,6 +159,14 @@ public void handleBatch(Exception thrownException, ConsumerRecords data, C doHandle(thrownException, data, consumer, container, invokeListener); } + @Override + public ConsumerRecords handleBatchAndReturnRemaining(Exception thrownException, + ConsumerRecords data, Consumer consumer, MessageListenerContainer container, + Runnable invokeListener) { + + return handle(thrownException, data, consumer, container, invokeListener); + } + @Override public void handleOtherException(Exception thrownException, Consumer consumer, MessageListenerContainer container, boolean batchListener) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index c219e1036e..566d944f57 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -72,6 +72,12 @@ public FailedBatchProcessor(@Nullable BiConsumer, Exception protected void doHandle(Exception thrownException, ConsumerRecords data, Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { + handle(thrownException, data, consumer, container, invokeListener); + } + + protected ConsumerRecords handle(Exception thrownException, ConsumerRecords data, + Consumer consumer, MessageListenerContainer container, Runnable invokeListener) { + BatchListenerFailedException batchListenerFailedException = getBatchListenerFailedException(thrownException); if (batchListenerFailedException == null) { this.logger.debug(thrownException, "Expected a BatchListenerFailedException; re-seeking batch"); @@ -87,9 +93,10 @@ protected void doHandle(Exception thrownException, ConsumerRecords data, C this.fallbackBatchHandler.handleBatch(thrownException, data, consumer, container, invokeListener); } else { - seekOrRecover(thrownException, data, consumer, container, index); + return seekOrRecover(thrownException, data, consumer, container, index); } } + return ConsumerRecords.empty(); } private int findIndex(ConsumerRecords data, ConsumerRecord record) { @@ -109,10 +116,12 @@ private int findIndex(ConsumerRecords data, ConsumerRecord record) { return i; } - private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords data, Consumer consumer, MessageListenerContainer container, int indexArg) { + @SuppressWarnings("unchecked") + private ConsumerRecords seekOrRecover(Exception thrownException, @Nullable ConsumerRecords data, + Consumer consumer, MessageListenerContainer container, int indexArg) { if (data == null) { - return; + return ConsumerRecords.empty(); } Iterator iterator = data.iterator(); List> toCommit = new ArrayList<>(); @@ -133,15 +142,37 @@ private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords< if (offsets.size() > 0) { commit(consumer, container, offsets); } - if (remaining.size() > 0) { - SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, - getRecoveryStrategy(remaining, thrownException), this.logger, getLogLevel()); - ConsumerRecord recovered = remaining.get(0); - commit(consumer, container, - Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - new OffsetAndMetadata(recovered.offset() + 1))); - if (remaining.size() > 1) { - throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); + if (isSeekAfterError()) { + if (remaining.size() > 0) { + SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false, + getFailureTracker()::recovered, this.logger, getLogLevel()); + ConsumerRecord recovered = remaining.get(0); + commit(consumer, container, + Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), + new OffsetAndMetadata(recovered.offset() + 1))); + if (remaining.size() > 1) { + throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); + } + } + return ConsumerRecords.empty(); + } + else { + if (indexArg == 0) { + return (ConsumerRecords) data; // first record just rerun the whole thing + } + else { + try { + if (getFailureTracker().recovered(remaining.get(0), thrownException, container, + consumer)) { + remaining.remove(0); + } + } + catch (Exception e) { + } + Map>> remains = new HashMap<>(); + remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()), + tp -> new ArrayList>()).add((ConsumerRecord) rec)); + return new ConsumerRecords<>(remains); } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java index f9afdfaa39..7eea0e4313 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java @@ -19,17 +19,17 @@ import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.BiPredicate; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.support.KafkaUtils; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.backoff.BackOff; +import org.springframework.util.backoff.FixedBackOff; /** * Common super class for classes that deal with failing to consume a consumer record. @@ -40,9 +40,15 @@ */ public abstract class FailedRecordProcessor extends ExceptionClassifier implements DeliveryAttemptAware { - private static final BiPredicate, Exception> ALWAYS_SKIP_PREDICATE = (r, e) -> true; + private static final BackOff NO_RETRIES_OR_DELAY_BACKOFF = new FixedBackOff(0L, 0L); - private static final BiPredicate, Exception> NEVER_SKIP_PREDICATE = (r, e) -> false; + private final BiFunction, Exception, BackOff> noRetriesForClassified = + (rec, ex) -> { + if (!getClassifier().classify(ex)) { + return NO_RETRIES_OR_DELAY_BACKOFF; + } + return this.userBackOffFunction.apply(rec, ex); + }; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR @@ -50,8 +56,13 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen private boolean commitRecovered; + private BiFunction, Exception, BackOff> userBackOffFunction = (rec, ex) -> null; + + private boolean seekAfterError = true; + protected FailedRecordProcessor(@Nullable BiConsumer, Exception> recoverer, BackOff backOff) { this.failureTracker = new FailedRecordTracker(recoverer, backOff, this.logger); + this.failureTracker.setBackOffFunction(this.noRetriesForClassified); } /** @@ -78,7 +89,8 @@ public void setCommitRecovered(boolean commitRecovered) { * @since 2.6 */ public void setBackOffFunction(BiFunction, Exception, BackOff> backOffFunction) { - this.failureTracker.setBackOffFunction(backOffFunction); + Assert.notNull(backOffFunction, "'backOffFunction' cannot be null"); + this.userBackOffFunction = backOffFunction; } /** @@ -96,7 +108,8 @@ public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) * to the previous failure for the same record. The * {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be * called to get the {@link BackOff} to use for the new exception; otherwise, the - * configured {@link BackOff} will be used. + * configured {@link BackOff} will be used. Default true since 2.9; set to false + * to use the existing retry state, even when exceptions change. * @param resetStateOnExceptionChange true to reset. * @since 2.6.3 */ @@ -114,6 +127,29 @@ public void setRetryListeners(RetryListener... listeners) { this.failureTracker.setRetryListeners(listeners); } + /** + * Return whether to seek after an exception is handled. + * @return true to seek. + * @since 2.9 + */ + public boolean isSeekAfterError() { + return this.seekAfterError; + } + + /** + * When true (default), the error handler will perform seeks on the failed and/or + * remaining records to they will be redelivered on the next poll. When false, the + * container will present the failed and/or remaining records to the listener by + * pausing the consumer for the next poll and using the existing records from the + * previous poll. When false; has the side-effect of setting + * {@link #setResetStateOnExceptionChange(boolean)} to true. + * @param seekAfterError false to not perform seeks. + * @since 2.9 + */ + public void setSeekAfterError(boolean seekAfterError) { + this.seekAfterError = seekAfterError; + } + @Override public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { return this.failureTracker.deliveryAttempt(topicPartitionOffset); @@ -126,7 +162,9 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) { * @param thrownException the exception. * @return the {@link RecoveryStrategy}. * @since 2.7 + * @deprecated - no longer used. */ + @Deprecated protected RecoveryStrategy getRecoveryStrategy(List> records, Exception thrownException) { return getRecoveryStrategy(records, null, thrownException); } @@ -139,34 +177,22 @@ protected RecoveryStrategy getRecoveryStrategy(List> record * @param thrownException the exception. * @return the {@link RecoveryStrategy}. * @since 2.8.4 + * @deprecated - no longer used. */ + @Deprecated protected RecoveryStrategy getRecoveryStrategy(List> records, @Nullable Consumer recoveryConsumer, Exception thrownException) { - if (getClassifier().classify(thrownException)) { - return this.failureTracker::recovered; - } - else { - try { - this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException); - this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException)); - } - catch (Exception ex) { - if (records.size() > 0) { - if (SeekUtils.isBackoffException(ex)) { - this.logger.debug("Recovery of record (" - + KafkaUtils.format(records.get(0)) + ") backed off: " + ex.getMessage()); - } - else { - this.logger.error(ex, () -> "Recovery of record (" - + KafkaUtils.format(records.get(0)) + ") failed"); - } - this.failureTracker.getRetryListeners().forEach(rl -> - rl.recoveryFailed(records.get(0), thrownException, ex)); - } - return (rec, excep, cont, consumer) -> NEVER_SKIP_PREDICATE.test(rec, excep); - } - return (rec, excep, cont, consumer) -> ALWAYS_SKIP_PREDICATE.test(rec, excep); - } + + return this.failureTracker::recovered; + } + + /** + * Return the failed record tracker. + * @return the tracker. + * @since 2.9 + */ + protected FailedRecordTracker getFailureTracker() { + return this.failureTracker; } public void clearThreadState() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java index 991e968cab..a7db88cec2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java @@ -60,7 +60,7 @@ class FailedRecordTracker implements RecoveryStrategy { private boolean resetStateOnRecoveryFailure = true; - private boolean resetStateOnExceptionChange; + private boolean resetStateOnExceptionChange = true; FailedRecordTracker(@Nullable BiConsumer, Exception> recoverer, BackOff backOff, LogAccessor logger) { @@ -118,7 +118,8 @@ public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) * to the previous failure for the same record. The * {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be * called to get the {@link BackOff} to use for the new exception; otherwise, the - * configured {@link BackOff} will be used. + * configured {@link BackOff} will be used. Default true since 2.9; set to false + * to use the existing retry state, even when exceptions change. * @param resetStateOnExceptionChange true to reset. * @since 2.6.3 */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 8e44692196..48d501fd5e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -564,7 +564,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final Consumer consumer; - private final Map> offsets = new HashMap<>(); + private final Map> offsets = new LinkedHashMap<>(); private final Collection assignedPartitions = new LinkedHashSet<>(); @@ -751,6 +751,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private boolean receivedSome; + private ConsumerRecords pendingRecordsAfterError; + private volatile boolean consumerPaused; private volatile Thread consumerThread; @@ -1333,12 +1335,14 @@ protected void pollAndInvoke() { return; } debugRecords(records); - resumeConsumerIfNeccessary(); - if (!this.consumerPaused) { - resumePartitionsIfNecessary(); - } invokeIfHaveRecords(records); + if (this.pendingRecordsAfterError == null) { + resumeConsumerIfNeccessary(); + if (!this.consumerPaused) { + resumePartitionsIfNecessary(); + } + } } private void invokeIfHaveRecords(@Nullable ConsumerRecords records) { @@ -1494,6 +1498,16 @@ private ConsumerRecords doPoll() { } else { records = pollConsumer(); + if (this.pendingRecordsAfterError != null) { + int howManyRecords = records.count(); + if (howManyRecords > 0) { + this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused " + + "after an error; emergency stop invoked to avoid message loss", howManyRecords)); + KafkaMessageListenerContainer.this.emergencyStop.run(); + } + records = this.pendingRecordsAfterError; + this.pendingRecordsAfterError = null; + } captureOffsets(records); checkRebalanceCommits(); } @@ -1596,7 +1610,9 @@ private void doPauseConsumerIfNecessary() { this.pausedForAsyncAcks = true; this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch); } - if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks)) { + if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks) + || this.pendingRecordsAfterError != null) { + this.consumer.pause(this.consumer.assignment()); this.consumerPaused = true; this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused()); @@ -2127,7 +2143,22 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords recor private void commitOffsetsIfNeeded(final ConsumerRecords records) { if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle()) || this.producer != null) { - this.acks.addAll(getHighestOffsetRecords(records)); + if (this.pendingRecordsAfterError != null) { + ConsumerRecord firstUncommitted = this.pendingRecordsAfterError.iterator().next(); + Iterator> it = records.iterator(); + while (it.hasNext()) { + ConsumerRecord next = it.next(); + if (!next.equals(firstUncommitted)) { + this.acks.add(next); + } + else { + break; + } + } + } + else { + this.acks.addAll(getHighestOffsetRecords(records)); + } if (this.producer != null) { sendOffsetsToTransaction(); } @@ -2264,9 +2295,19 @@ private void doInvokeBatchOnMessage(final ConsumerRecords records, private void invokeBatchErrorHandler(final ConsumerRecords records, @Nullable List> list, RuntimeException rte) { - this.commonErrorHandler.handleBatch(rte, records, this.consumer, - KafkaMessageListenerContainer.this.thisOrParentContainer, - () -> invokeBatchOnMessageWithRecordsOrList(records, list)); + if (this.commonErrorHandler.seeksAfterHandling() || this.transactionManager != null) { + this.commonErrorHandler.handleBatch(rte, records, this.consumer, + KafkaMessageListenerContainer.this.thisOrParentContainer, + () -> invokeBatchOnMessageWithRecordsOrList(records, list)); + } + else { + ConsumerRecords afterHandling = this.commonErrorHandler.handleBatchAndReturnRemaining(rte, + records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer, + () -> invokeBatchOnMessageWithRecordsOrList(records, list)); + if (!afterHandling.isEmpty()) { + this.pendingRecordsAfterError = afterHandling; + } + } } private void invokeRecordListener(final ConsumerRecords records) { @@ -2536,7 +2577,10 @@ private void commitOffsetsIfNeeded(final ConsumerRecord record) { if (this.isManualAck) { this.commitRecovered = true; } - ackCurrent(record); + if (this.pendingRecordsAfterError == null + || !record.equals(this.pendingRecordsAfterError.iterator().next())) { + ackCurrent(record); + } if (this.isManualAck) { this.commitRecovered = false; } @@ -2620,7 +2664,7 @@ record = this.recordInterceptor.intercept(record, this.consumer); private void invokeErrorHandler(final ConsumerRecord record, Iterator> iterator, RuntimeException rte) { - if (this.commonErrorHandler.remainingRecords()) { + if (this.commonErrorHandler.seeksAfterHandling()) { if (this.producer == null) { processCommits(); } @@ -2633,8 +2677,20 @@ private void invokeErrorHandler(final ConsumerRecord record, KafkaMessageListenerContainer.this.thisOrParentContainer); } else { - this.commonErrorHandler.handleRecord(rte, record, this.consumer, + boolean handled = this.commonErrorHandler.handleOne(rte, record, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer); + Map>> records = new HashMap<>(); + if (!handled) { + records.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), + tp -> new ArrayList>()).add(record); + } + while (iterator.hasNext()) { + records.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), + tp -> new ArrayList>()).add(iterator.next()); + } + if (records.size() > 0) { + this.pendingRecordsAfterError = new ConsumerRecords<>(records); + } } } @@ -3000,7 +3056,7 @@ private void doCommitSync(Map commits, int re } private Map buildCommits() { - Map commits = new HashMap<>(); + Map commits = new LinkedHashMap<>(); for (Entry> entry : this.offsets.entrySet()) { for (Entry offset : entry.getValue().entrySet()) { commits.put(new TopicPartition(entry.getKey(), offset.getKey()), diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index fcf131ee51..c8f39a79bf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -1599,12 +1599,13 @@ public CommonErrorHandler listen16ErrorHandler() { return new CommonErrorHandler() { @Override - public void handleRecord(Exception thrownException, ConsumerRecord record, + public boolean handleOne(Exception thrownException, ConsumerRecord record, Consumer consumer, MessageListenerContainer container) { listen16Exception = thrownException; listen16Message = record.value(); listen16ErrorLatch.countDown(); + return true; } }; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonMixedErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonMixedErrorHandlerTests.java index 8abb708b78..0d6c7bdf26 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonMixedErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/CommonMixedErrorHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,8 +36,8 @@ void testMixed() { CommonMixedErrorHandler mixed = new CommonMixedErrorHandler(record, batch); mixed.handleBatch(null, null, null, null, null); verify(batch).handleBatch(null, null, null, null, null); - mixed.handleRecord(null, null, null, null); - verify(record).handleRecord(null, null, null, null); + mixed.handleOne(null, null, null, null); + verify(record).handleOne(null, null, null, null); mixed.handleRemaining(null, null, null, null); verify(record).handleRemaining(null, null, null, null); mixed.handleOtherException(null, null, null, false); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index a32be058fa..e01103ebd5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -678,7 +678,7 @@ public boolean isAckAfterHandle() { } @Override - public boolean remainingRecords() { + public boolean seeksAfterHandling() { return false; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchAckTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchAckTests.java new file mode 100644 index 0000000000..1d898ce0ab --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchAckTests.java @@ -0,0 +1,219 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class DefaultErrorHandlerNoSeeksBatchAckTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Deliver 6 records from three partitions, fail on the second record second + * partition, first attempt; verify partition 0,1 committed and a total of 7 records + * handled after seek. + */ + @SuppressWarnings("unchecked") + @Test + public void retriesWithNoSeeksAckModeBatch() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer); + inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + Map offsets = new LinkedHashMap<>(); + offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); + offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); + inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).pause(any()); + inOrder.verify(this.consumer).resume(any()); + offsets = new LinkedHashMap<>(); + offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); + offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); + inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + assertThat(this.config.count).isEqualTo(7); + assertThat(this.config.contents.toArray()).isEqualTo(new String[] + { "foo", "bar", "baz", "qux", "qux", "fiz", "buz" }); + } + + @Configuration + @EnableKafka + public static class Config { + + private final List contents = new ArrayList<>(); + + private final CountDownLatch pollLatch = new CountDownLatch(3); + + private final CountDownLatch deliveryLatch = new CountDownLatch(7); + + private final CountDownLatch commitLatch = new CountDownLatch(3); + + private final CountDownLatch closeLatch = new CountDownLatch(1); + + private int count; + + @KafkaListener(topics = "foo", groupId = "grp") + public void foo(String in) { + this.contents.add(in); + this.deliveryLatch.countDown(); + if (++this.count == 4) { // part 1, offset 1, first time + throw new RuntimeException("foo"); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + willAnswer(i -> { + ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( + Collections.singletonList(topicPartition1)); + return null; + }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return ConsumerRecords.empty(); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(AckMode.BATCH); + DefaultErrorHandler eh = new DefaultErrorHandler(); + eh.setSeekAfterError(false); + factory.setCommonErrorHandler(eh); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java new file mode 100644 index 0000000000..1f158fc2ce --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksBatchListenerTests.java @@ -0,0 +1,250 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +@SuppressWarnings("deprecation") +public class DefaultErrorHandlerNoSeeksBatchListenerTests { + + private static final String CONTAINER_ID = "container"; + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @SuppressWarnings("rawtypes") + @Autowired + private Producer producer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Deliver 6 records from three partitions, fail on the second record second + * partition. + */ + @SuppressWarnings("unchecked") + @Test + void retriesWithNoSeeksAckModeBatch() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer, this.producer); + inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + Map offsets = new LinkedHashMap<>(); + offsets.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)); + offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)); + inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); + inOrder.verify(this.consumer).pause(any()); + inOrder.verify(this.consumer).resume(any()); + offsets = new LinkedHashMap<>(); + offsets.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)); + offsets.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)); + inOrder.verify(this.consumer).commitSync(offsets, Duration.ofSeconds(60)); + assertThat(this.config.ehException).isInstanceOf(ListenerExecutionFailedException.class); + assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID); + assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz"); + } + + @Configuration + @EnableKafka + public static class Config { + + final CountDownLatch pollLatch = new CountDownLatch(1); + + final CountDownLatch deliveryLatch = new CountDownLatch(2); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final AtomicBoolean fail = new AtomicBoolean(true); + + final List contents = new ArrayList<>(); + + volatile Exception ehException; + + @KafkaListener(id = CONTAINER_ID, topics = "foo") + public void foo(List in) { + this.contents.addAll(in); + this.deliveryLatch.countDown(); + if (this.fail.getAndSet(false)) { + throw new BatchListenerFailedException("test", 3); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer(CONTAINER_ID, "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + willAnswer(i -> { + ((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned( + Collections.singletonList(topicPartition1)); + return null; + }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willReturn(new ConsumerGroupMetadata(CONTAINER_ID)).given(consumer).groupMetadata(); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + DefaultErrorHandler eh = new DefaultErrorHandler() { + + @Override + public ConsumerRecords handleBatchAndReturnRemaining(Exception thrownException, + ConsumerRecords data, Consumer consumer, MessageListenerContainer container, + Runnable invokeListener) { + + Config.this.ehException = thrownException; + return super.handleBatchAndReturnRemaining(thrownException, data, consumer, container, invokeListener); + } + + }; + eh.setSeekAfterError(false); + factory.setCommonErrorHandler(eh); + return factory; + } + + @SuppressWarnings("rawtypes") + @Bean + public ProducerFactory producerFactory() { + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.createProducer(isNull())).willReturn(producer()); + given(pf.transactionCapable()).willReturn(true); + return pf; + } + + @SuppressWarnings("rawtypes") + @Bean + public Producer producer() { + return mock(Producer.class); + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckTests.java new file mode 100644 index 0000000000..5d755a4d9f --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerNoSeeksRecordAckTests.java @@ -0,0 +1,291 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; +import org.junit.jupiter.api.Test; +import org.mockito.InOrder; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties.AckMode; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author Gary Russell + * @since 2.9 + * + */ +@SpringJUnitConfig +@DirtiesContext +public class DefaultErrorHandlerNoSeeksRecordAckTests { + + @SuppressWarnings("rawtypes") + @Autowired + private Consumer consumer; + + @Autowired + private Config config; + + @Autowired + private KafkaListenerEndpointRegistry registry; + + /* + * Deliver 6 records from three partitions, fail on the second record second + * partition, first attempt; verify partition 0,1 committed and a total of 7 records + * handled after seek. + */ + @SuppressWarnings("unchecked") + @Test + public void retriesWithNoSeeksAckModeRecord() throws Exception { + assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue(); + this.registry.stop(); + assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + InOrder inOrder = inOrder(this.consumer); + inOrder.verify(this.consumer).assign(any(Collection.class)); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(1L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).pause(any()); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(2L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 2), new OffsetAndMetadata(1L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).commitSync( + Collections.singletonMap(new TopicPartition("foo", 2), new OffsetAndMetadata(2L)), + Duration.ofSeconds(60)); + inOrder.verify(this.consumer).resume(any()); + inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + assertThat(this.config.count).isEqualTo(8); + assertThat(this.config.contents).contains("foo", "bar", "baz", "qux", "qux", "qux", "fiz", "buz"); + assertThat(this.config.deliveries).contains(1, 1, 1, 1, 2, 3, 1, 1); + assertThat(this.config.deliveryAttempt).isNotNull(); + verify(this.consumer, never()).seek(any(), anyLong()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void emergencyStopIfPollReturnsRecordsUnexpectedly() throws InterruptedException { + final Consumer consumer = mock(Consumer.class); + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + given(consumerFactory.createConsumer("grp", "", null, KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + ConsumerRecords records = new ConsumerRecords(Map.of(new TopicPartition("foo", 0), + List.of(new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty())))); + willAnswer(inv -> { + Thread.sleep(20); + return records; + }).given(consumer).poll(any()); + CountDownLatch latch = new CountDownLatch(1); + willAnswer(inv -> { + latch.countDown(); + return null; + }).given(consumer).close(); + ContainerProperties props = new ContainerProperties("foo"); + props.setGroupId("grp"); + props.setMessageListener((MessageListener) rec -> { + throw new RuntimeException("emergencyStopIfPollReturnsRecordsUnexpectedly"); + }); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(consumerFactory, props); + DefaultErrorHandler deh = new DefaultErrorHandler(new FixedBackOff(10, 5)); + deh.setSeekAfterError(false); + container.setCommonErrorHandler(deh); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(container.isRunning()).isFalse(); + assertThat(container.isInExpectedState()).isFalse(); + } + + @Configuration + @EnableKafka + public static class Config { + + final List contents = new ArrayList<>(); + + final List deliveries = new ArrayList<>(); + + final CountDownLatch pollLatch = new CountDownLatch(4); + + final CountDownLatch deliveryLatch = new CountDownLatch(8); + + final CountDownLatch closeLatch = new CountDownLatch(1); + + final CountDownLatch commitLatch = new CountDownLatch(6); + + int count; + + volatile org.apache.kafka.common.header.Header deliveryAttempt; + + @KafkaListener(groupId = "grp", + topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo", + partitions = "#{'0,1,2'.split(',')}")) + public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) { + this.contents.add(in); + this.deliveries.add(delivery); + this.deliveryLatch.countDown(); + if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times + throw new RuntimeException("foo"); + } + } + + @SuppressWarnings({ "rawtypes" }) + @Bean + public ConsumerFactory consumerFactory() { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); + final Consumer consumer = consumer(); + given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides())) + .willReturn(consumer); + return consumerFactory; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Consumer consumer() { + final Consumer consumer = mock(Consumer.class); + final TopicPartition topicPartition0 = new TopicPartition("foo", 0); + final TopicPartition topicPartition1 = new TopicPartition("foo", 1); + final TopicPartition topicPartition2 = new TopicPartition("foo", 2); + Map> records1 = new LinkedHashMap<>(); + records1.put(topicPartition0, Arrays.asList( + new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "foo", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "bar", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition1, Arrays.asList( + new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "baz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux", + new RecordHeaders(), Optional.empty()))); + records1.put(topicPartition2, Arrays.asList( + new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "fiz", + new RecordHeaders(), Optional.empty()), + new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "buz", + new RecordHeaders(), Optional.empty()))); + final AtomicInteger which = new AtomicInteger(); + willAnswer(i -> { + this.pollLatch.countDown(); + switch (which.getAndIncrement()) { + case 0: + return new ConsumerRecords(records1); + default: + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return new ConsumerRecords(Collections.emptyMap()); + } + }).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT)); + List paused = new ArrayList<>(); + willAnswer(i -> { + this.commitLatch.countDown(); + return null; + }).given(consumer).commitSync(anyMap(), any()); + willAnswer(i -> { + this.closeLatch.countDown(); + return null; + }).given(consumer).close(); + willAnswer(i -> { + paused.addAll(i.getArgument(0)); + return null; + }).given(consumer).pause(any()); + willAnswer(i -> { + return new HashSet<>(paused); + }).given(consumer).paused(); + willAnswer(i -> { + paused.removeAll(i.getArgument(0)); + return null; + }).given(consumer).resume(any()); + return consumer; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); + factory.setConsumerFactory(consumerFactory()); + factory.getContainerProperties().setAckMode(AckMode.RECORD); + factory.getContainerProperties().setDeliveryAttemptHeader(true); + factory.setRecordInterceptor((record, consumer) -> { + Config.this.deliveryAttempt = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + return record; + }); + DefaultErrorHandler eh = new DefaultErrorHandler(); + eh.setSeekAfterError(false); + factory.setCommonErrorHandler(eh); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java index 1b151a5ae1..d597e0cf08 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultErrorHandlerRecordTests.java @@ -19,10 +19,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.Collections; @@ -54,7 +58,68 @@ public class DefaultErrorHandlerRecordTests { @Test - public void testClassifier() { + void noSeeks() { + AtomicReference> recovered = new AtomicReference<>(); + AtomicBoolean recovererShouldFail = new AtomicBoolean(false); + DefaultErrorHandler handler = new DefaultErrorHandler((r, t) -> { + if (recovererShouldFail.getAndSet(false)) { + throw new RuntimeException("test recoverer failure"); + } + recovered.set(r); + }); + handler.setSeekAfterError(false); + AtomicInteger failedDeliveryAttempt = new AtomicInteger(); + AtomicReference recoveryFailureEx = new AtomicReference<>(); + AtomicBoolean isRecovered = new AtomicBoolean(); + handler.setRetryListeners(new RetryListener() { + + @Override + public void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt) { + failedDeliveryAttempt.set(deliveryAttempt); + } + + @Override + public void recovered(ConsumerRecord record, Exception ex) { + isRecovered.set(true); + } + + @Override + public void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { + recoveryFailureEx.set(failure); + } + + }); + ConsumerRecord record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar"); + ConsumerRecord record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar"); + List> records = Arrays.asList(record1, record2); + IllegalStateException illegalState = new IllegalStateException(); + Consumer consumer = mock(Consumer.class); + assertThat(handler.handleOne(illegalState, record1, consumer, mock(MessageListenerContainer.class))).isFalse(); + assertThat(handler.handleOne(new DeserializationException("intended", null, false, illegalState), record1, + consumer, mock(MessageListenerContainer.class))).isTrue(); + assertThat(recovered.get()).isSameAs(record1); + recovered.set(null); + assertThat(handler.handleOne(new ConversionException("intended", null), record1, + consumer, mock(MessageListenerContainer.class))).isTrue(); + assertThat(recovered.get()).isSameAs(record1); + handler.addNotRetryableExceptions(IllegalStateException.class); + recovered.set(null); + recovererShouldFail.set(true); + assertThat(handler.handleOne(illegalState, record1, consumer, mock(MessageListenerContainer.class))).isFalse(); + assertThat(handler.handleOne(new DeserializationException("intended", null, false, illegalState), record1, + consumer, mock(MessageListenerContainer.class))).isTrue(); + assertThat(recovered.get()).isSameAs(record1); + verify(consumer, never()).seek(any(), anyLong()); + assertThat(failedDeliveryAttempt.get()).isEqualTo(1); + assertThat(recoveryFailureEx.get()) + .isInstanceOf(RuntimeException.class) + .extracting(ex -> ex.getMessage()) + .isEqualTo("test recoverer failure"); + assertThat(isRecovered.get()).isTrue(); + } + + @Test + void testClassifier() { AtomicReference> recovered = new AtomicReference<>(); AtomicBoolean recovererShouldFail = new AtomicBoolean(false); DefaultErrorHandler handler = new DefaultErrorHandler((r, t) -> { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordProcessorTests.java index c3cae7c2fd..8d72fa9d8d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2021 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,29 +43,29 @@ void deliveryAttempts() throws InterruptedException { List> records = Collections .singletonList(new ConsumerRecord("foo", 0, 0L, null, null)); RuntimeException exception = new RuntimeException(); - frp.getRecoveryStrategy(records, exception).recovered(records.get(0), exception, null, null); + frp.getFailureTracker().recovered(records.get(0), exception, null, null); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(2); - frp.getRecoveryStrategy(records, exception).recovered(records.get(0), exception, null, null); + frp.getFailureTracker().recovered(records.get(0), exception, null, null); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(3); - frp.getRecoveryStrategy(records, exception).recovered(records.get(0), exception, null, null); + frp.getFailureTracker().recovered(records.get(0), exception, null, null); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(1); - frp.getRecoveryStrategy(records, exception).recovered(records.get(0), exception, null, null); + frp.getFailureTracker().recovered(records.get(0), exception, null, null); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(2); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(2); // new partition TopicPartitionOffset tpo2 = new TopicPartitionOffset("foo", 1, 0L); assertThat(frp.deliveryAttempt(tpo2)).isEqualTo(1); - frp.getRecoveryStrategy(records, exception).recovered(new ConsumerRecord("foo", 1, 0L, null, null), + frp.getFailureTracker().recovered(new ConsumerRecord("foo", 1, 0L, null, null), exception, null, null); assertThat(frp.deliveryAttempt(tpo2)).isEqualTo(2); // new offset tpo2 = new TopicPartitionOffset("foo", 1, 1L); assertThat(frp.deliveryAttempt(tpo2)).isEqualTo(1); - frp.getRecoveryStrategy(records, exception).recovered(new ConsumerRecord("foo", 1, 1L, null, null), + frp.getFailureTracker().recovered(new ConsumerRecord("foo", 1, 1L, null, null), exception, null, null); assertThat(frp.deliveryAttempt(tpo2)).isEqualTo(2); // back to original - frp.getRecoveryStrategy(records, exception).recovered(records.get(0), exception, null, null); + frp.getFailureTracker().recovered(records.get(0), exception, null, null); assertThat(frp.deliveryAttempt(tpo1)).isEqualTo(3); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java index b24e0e0b7c..ec477b2f71 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -158,9 +158,7 @@ void exceptionChanges(boolean reset) { return bo2; } }); - if (reset) { - tracker.setResetStateOnExceptionChange(reset); - } + tracker.setResetStateOnExceptionChange(reset); @SuppressWarnings("unchecked") ThreadLocal> failures = (ThreadLocal>) KafkaTestUtils .getPropertyValue(tracker, "failures"); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index f645610746..3d4944c5f2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -2773,17 +2773,25 @@ public void rePausePartitionAfterRebalance() throws Exception { TopicPartition tp1 = new TopicPartition("foo", 1); given(consumer.assignment()).willReturn(Set.of(tp0, tp1)); final CountDownLatch pauseLatch1 = new CountDownLatch(1); - final CountDownLatch pauseLatch2 = new CountDownLatch(2); + final CountDownLatch suspendConsumerThread = new CountDownLatch(1); Set pausedParts = ConcurrentHashMap.newKeySet(); + Thread testThread = Thread.currentThread(); + AtomicBoolean paused = new AtomicBoolean(); willAnswer(i -> { pausedParts.clear(); pausedParts.addAll(i.getArgument(0)); - pauseLatch1.countDown(); - pauseLatch2.countDown(); + if (!Thread.currentThread().equals(testThread)) { + paused.set(true); + } return null; }).given(consumer).pause(any()); given(consumer.paused()).willReturn(pausedParts); given(consumer.poll(any(Duration.class))).willAnswer(i -> { + if (paused.get()) { + pauseLatch1.countDown(); + // hold up the consumer thread while we revoke/assign partitions on the test thread + suspendConsumerThread.await(10, TimeUnit.SECONDS); + } Thread.sleep(50); return ConsumerRecords.empty(); }); @@ -2814,15 +2822,14 @@ public void rePausePartitionAfterRebalance() throws Exception { .contains(tp0, tp1); rebal.get().onPartitionsRevoked(Set.of(tp0, tp1)); rebal.get().onPartitionsAssigned(Collections.singleton(tp0)); - assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(pausedParts).hasSize(1) .contains(tp0); assertThat(container).extracting("listenerConsumer") .extracting("pausedPartitions") .asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class)) .hasSize(1) - .containsExactlyInAnyOrder(tp0); - + .contains(tp0); + suspendConsumerThread.countDown(); container.stop(); } @@ -3135,6 +3142,7 @@ public void handleOtherException(Exception thrownException, Consumer consu } @Test + @SuppressWarnings({ "unchecked", "rawtypes" }) void testFatalErrorOnAuthenticationException() throws InterruptedException { ConsumerFactory cf = mock(ConsumerFactory.class); ContainerProperties containerProps = new ContainerProperties(topic1); @@ -3147,6 +3155,7 @@ void testFatalErrorOnAuthenticationException() throws InterruptedException { } @Test + @SuppressWarnings({ "unchecked", "rawtypes" }) void testFatalErrorOnAuthenticationExceptionConcurrent() throws InterruptedException { ConsumerFactory cf = mock(ConsumerFactory.class); ContainerProperties containerProps = new ContainerProperties(topic1);