diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index ad72886974..d98614da7f 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -1248,14 +1248,12 @@ IMPORTANT: `nack()` can only be called on the consumer thread that invokes your With a record listener, when `nack()` is called, any pending offsets are committed, the remaing records from the last poll are discarded, and seeks are performed on their partitions so that the failed record and unprocessed records are redelivered on the next `poll()`. The consumer thread can be paused before redelivery, by setting the `sleep` argument. -This is similar functionality to throwing an exception when the container is configured with a `SeekToCurrentErrorHandler`. +This is similar functionality to throwing an exception when the container is configured with a `DefaultErrorHandler`. When using a batch listener, you can specify the index within the batch where the failure occurred. When `nack()` is called, offsets will be committed for records before the index and seeks are performed on the partitions for the failed and discarded records so that they will be redelivered on the next `poll()`. -This is an improvement over the `SeekToCurrentBatchErrorHandler`, which can only seek the entire batch for redelivery. -See <> for more information. -Also see <>. +See <> for more information. IMPORTANT: When using partition assignment via group management, it is important to ensure the `sleep` argument (plus the time spent processing records from the previous poll) is less than the consumer `max.poll.interval.ms` property. @@ -2310,7 +2308,7 @@ IMPORTANT: The `FilteringBatchMessageListenerAdapter` is ignored if your `@Kafka [[retrying-deliveries]] ===== Retrying Deliveries -See the `SeekToCurrentErrorHandler` in <>. +See the `DefaultErrorHandler` in <>. [[sequencing]] ===== Starting `@KafkaListener` s in Sequence @@ -2588,7 +2586,7 @@ Processor |batchError Handler |See desc. -|An error handler for a batch listener; defaults to a `RecoveringBatchErrorHandler` or `null` if transactions are being used (errors are handled by the `AfterRollbackProcessor`). +|Deprecated - see `commonErrorHandler`. |batch Interceptor @@ -2600,6 +2598,11 @@ Also see `interceptBeforeTx`. |bean name |The bean name of the container; suffixed with `-n` for child containers. +|commonErrorHandler +|See desc. +|`DefaultErrorHandler` or `null` when a `transactionManager` is provided when a `DefaultAfterRollbackProcessor` is used. +See <>. + |containerProperties |`Container` `Properties` @@ -2607,11 +2610,11 @@ Also see `interceptBeforeTx`. |errorHandler |See desc. -|An error handler for a record listener; defaults to a `SeekToCurrentErrorHandler` or `null` if transactions are being used (errors are handled by the `AfterRollbackProcessor`). +|Deprecated - see `commonErrorHandler`. |genericErrorHandler |See desc. -|Either a batch or record error handler - see `batchErrorHandler` and `errorHandler`. +|Deprecated - see `commonErrorHandler`. |groupId |See desc. @@ -4831,23 +4834,20 @@ NOTE: The preceding two examples are simplistic implementations, and you would p [[error-handlers]] ===== Container Error Handlers -Two error handler interfaces (`ErrorHandler` and `BatchErrorHandler`) are provided. -You must configure the appropriate type to match the <>. - -NOTE: Starting with version 2.5, the default error handlers, when transactions are not being used, are the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler` with default configuration. -See <> and <>. -To restore the previous behavior, use the `LoggingErrorHandler` and `BatchLoggingErrorHandler` instead. +Starting with version 2.8, the legacy `ErrorHandler` and `BatchErrorHandler` interfaces have been superceded by a new `CommonErrorHandler`. +These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener. +`CommonErrorHandler` implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated. +The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release. When transactions are being used, no error handlers are configured, by default, so that the exception will roll back the transaction. Error handling for transactional containers are handled by the <>. If you provide a custom error handler when using transactions, it must throw an exception if you want the transaction rolled back. -Starting with version 2.3.2, these interfaces have a default method `isAckAfterHandle()` which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception. -Starting with version 2.4, this returns true by default. +This interface has a default method `isAckAfterHandle()` which is called by the container to determine whether the offset(s) should be committed if the error handler returns without throwing an exception; it returns true by default. Typically, the error handlers provided by the framework will throw an exception when the error is not "handled" (e.g. after performing a seek operation). By default, such exceptions are logged by the container at `ERROR` level. -Starting with version 2.5, all the framework error handlers extend `KafkaExceptionLogLevelAware` which allows you to control the level at which these exceptions are logged. +All of the framework error handlers extend `KafkaExceptionLogLevelAware` which allows you to control the level at which these exceptions are logged. ==== [source, java] @@ -4874,25 +4874,7 @@ public KafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); ... - factory.setErrorHandler(myErrorHandler); - ... - return factory; -} ----- -==== - -Similarly, you can set a global batch error handler: - -==== -[source, java] ----- -@Bean -public KafkaListenerContainerFactory> - kafkaListenerContainerFactory() { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - ... - factory.setBatchErrorHandler(myBatchErrorHandler); + factory.setCommonErrorHandler(myErrorHandler); ... return factory; } @@ -4901,66 +4883,34 @@ public KafkaListenerContainerFactory data, Consumer consumer); ----- -==== - -The `handle` method of the `ConsumerAwareBatchErrorHandler` has the following signature: - -==== -[source, java] ----- -void handle(Exception thrownException, ConsumerRecords data, Consumer consumer); ----- -==== +The container commits any pending offset commits before calling the error handler. -Similar to the `@KafkaListener` error handlers, you can reset the offsets as needed, based on the data that failed. +If you are using Spring Boot, you simply need to add the error handler as a `@Bean` and Boot will add it to the auto-configured factory. -NOTE: Unlike the listener-level error handlers, however, you should set the `ackOnError` container property to `false` (default) when making adjustments. -Otherwise, any pending acks are applied after your repositioning. +===== DefaultErrorHandler -[[seek-to-current]] -===== Seek To Current Container Error Handlers +This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now. +One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <>. -If an `ErrorHandler` implements `RemainingRecordsErrorHandler`, the error handler is provided with the failed record and any unprocessed records retrieved by the previous `poll()`. -Those records are not passed to the listener after the handler exits. -The following listing shows the `RemainingRecordsErrorHandler` interface definition: +The error handler can recover (skip) a record that keeps failing. +By default, after ten failures, the failed record is logged (at the `ERROR` level). +You can configure the handler with a custom recoverer (`BiConsumer`) and a `BackOff` that controls the delivery attempts and delays between each. +Using a `FixedBackOff` with `FixedBackOff.UNLIMITED_ATTEMPTS` causes (effectively) infinite retries. +The following example configures recovery after three tries: ==== [source, java] ---- -@FunctionalInterface -public interface RemainingRecordsErrorHandler extends ConsumerAwareErrorHandler { - - void handle(Exception thrownException, List> records, Consumer consumer); - -} +DefaultErrorHandler errorHandler = + new DefaultErrorHandler((record, exception) -> { + // recover after 3 failures, with no back off - e.g. send to a dead-letter topic + }, new FixedBackOff(0L, 2L)); ---- ==== -This interface lets implementations seek all unprocessed topics and partitions so that the current record (and the others remaining) are retrieved by the next poll. -The `SeekToCurrentErrorHandler` does exactly this. - -IMPORTANT: `ackOnError` must be `false` (which is the default). -Otherwise, if the container is stopped after the seek, but before the record is reprocessed, the record will be skipped when the container is restarted. - -This is now the default error handler for record listeners. +To configure the listener container with a customized instance of this handler, add it to the container factory. -The container commits any pending offset commits before calling the error handler. - -To configure the listener container with this handler, add it to the container factory. - -For example, with the `@KafkaListener` container factory, you can add `SeekToCurrentErrorHandler` as follows: +For example, with the `@KafkaListener` container factory, you can add `DefaultErrorHandler` as follows: ==== [source, java] @@ -4971,45 +4921,61 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckOnError(false); factory.getContainerProperties().setAckMode(AckMode.RECORD); - factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 2L))); + factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L))); return factory; } ---- ==== -This will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (`FixedBackOff(0L, 9)`). +For a record listener, this will retry a delivery up to 2 times (3 delivery attempts) with a back off of 1 second, instead of the default configuration (`FixedBackOff(0L, 9)`). Failures are simply logged after retries are exhausted. As an example; if the `poll` returns six records (two from each partition 0, 1, 2) and the listener throws an exception on the fourth record, the container acknowledges the first three messages by committing their offsets. -The `SeekToCurrentErrorHandler` seeks to offset 1 for partition 1 and offset 0 for partition 2. +The `DefaultErrorHandler` seeks to offset 1 for partition 1 and offset 0 for partition 2. The next `poll()` returns the three unprocessed records. If the `AckMode` was `BATCH`, the container commits the offsets for the first two partitions before calling the error handler. -Starting with version 2.2, the `SeekToCurrentErrorHandler` can now recover (skip) a record that keeps failing. -By default, after ten failures, the failed record is logged (at the `ERROR` level). -You can configure the handler with a custom recoverer (`BiConsumer`) and maximum failures. -Using a `FixedBackOff` with `FixedBackOff.UNLIMITED_ATTEMPTS` causes (effectively) infinite retries. -The following example configures recovery after three tries: +For a batch listener, the listener must throw a `BatchListenerFailedException` indicating which records in the batch failed. + +The sequence of events is: + +* Commit the offsets of the records before the index. +* If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered. +* If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. +The recovered record's offset is committed +* If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted. + +The default recoverer logs the failed record after retries are exhausted. +You can use a custom recoverer, or one provided by the framework such as the <>. + +When using a POJO batch listener (e.g. `List`), and you don't have the full consumer record to add to the exception, you can just add the index of the record that failed: ==== [source, java] ---- -SeekToCurrentErrorHandler errorHandler = - new SeekToCurrentErrorHandler((record, exception) -> { - // recover after 3 failures, woth no back off - e.g. send to a dead-letter topic - }, new FixedBackOff(0L, 2L)); +@KafkaListener(id = "recovering", topics = "someTopic") +public void listen(List things) { + for (int i = 0; i < records.size(); i++) { + try { + process(things.get(i)); + } + catch (Exception e) { + throw new BatchListenerFailedException("Failed to process", i); + } + } +} ---- ==== -Starting with version 2.2.4, when the container is configured with `AckMode.MANUAL_IMMEDIATE`, the error handler can be configured to commit the offset of recovered records; set the `commitRecovered` property to `true`. +When the container is configured with `AckMode.MANUAL_IMMEDIATE`, the error handler can be configured to commit the offset of recovered records; set the `commitRecovered` property to `true`. See also <>. When using transactions, similar functionality is provided by the `DefaultAfterRollbackProcessor`. See <>. -Starting with version 2.3, the `SeekToCurrentErrorHandler` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. +The `DefaultErrorHandler` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are: * `DeserializationException` @@ -5022,7 +4988,7 @@ The exceptions that are considered fatal, by default, are: since these exceptions are unlikely to be resolved on a retried delivery. You can add more exception types to the not-retryable category, or completely replace the map of classified exceptions. -See the Javadocs for `SeekToCurrentErrorHandler.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`. +See the Javadocs for `DefaultErrorHandler.addNotRetryableException()` and `DefaultErrorHandler.setClassifications()` for more information, as well as those for the `spring-retry` `BinaryExceptionClassifier`. Here is an example that adds `IllegalArgumentException` to the not-retryable exceptions: @@ -5030,15 +4996,15 @@ Here is an example that adds `IllegalArgumentException` to the not-retryable exc [source, java] ---- @Bean -public SeekToCurrentErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) { - SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer); - handler.addNotRetryableException(IllegalArgumentException.class); +public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) { + DefaultErrorHandler handler = new DefaultErrorHandler(recoverer); + handler.addNotRetryableExceptions(IllegalArgumentException.class); return handler; } ---- ==== -Starting with version 2.7, the error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. +The error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. ==== [source, java] @@ -5060,37 +5026,11 @@ public interface RetryListener { See the javadocs for more information. -The `SeekToCurrentBatchErrorHandler` seeks each partition to the first record in each partition in the batch, so the whole batch is replayed. -Also see <> for an alternative. -Also see <>. -This error handler does not support recovery, because the framework cannot know which message in the batch is failing. - -After seeking, an exception that wraps the `ListenerExecutionFailedException` is thrown. -This is to cause the transaction to roll back (if transactions are enabled). - -Starting with version 2.3, a `BackOff` can be provided to the `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor` so that the consumer thread can sleep for some configurable time between delivery attempts. -Spring Framework provides two out of the box `BackOff` s, `FixedBackOff` and `ExponentialBackOff`. -The maximum back off time must not exceed the `max.poll.interval.ms` consumer property, to avoid a rebalance. - -IMPORTANT: Previously, the configuration was "maxFailures" (which included the first delivery attempt). -When using a `FixedBackOff`, its `maxAttempts` property represents the number of delivery retries (one less than the old `maxFailures` property). -Also, `maxFailures=-1` meant retry indefinitely with the old configuration, with a `BackOff` you would set the `maxAttempts` to `Long.MAX_VALUE` for a `FixedBackOff` and leave the `maxElapsedTime` to its default in an `ExponentialBackOff`. - -The `SeekToCurrentBatchErrorHandler` can also be configured with a `BackOff` to add a delay between delivery attempts. -Generally, you should configure the `BackOff` to never return `STOP`. -However, since this error handler has no mechanism to "recover" after retries are exhausted, if the `BackOffExecution` returns `STOP`, the previous interval will be used for all subsequent delays. -Again, the maximum delay must be less than the `max.poll.interval.ms` consumer property. -Also see <>. - IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. -Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. -With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. -To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. - -Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured). -To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false. +If the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. +To skip retries after a recovery failure, set the error handler's `resetStateOnRecoveryFailure` to `false`. -Starting with version 2.6, you can now provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: +You can provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: ==== [source, java] @@ -5101,175 +5041,103 @@ handler.setBackOffFunction((record, ex) -> { ... }); If the function returns `null`, the handler's default `BackOff` will be used. -Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures. +Set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures. By default, the exception type is not considered. Also see <>. -Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay. - [[retrying-batch-eh]] ===== Retrying Batch Error Handler -As discussed above, the `SeekToCurrentBatchErrorHandler` has no mechanism to recover after a certain number of failures. -One reason for this is there is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order. +This legacy `BatchErrorHandler` is not deprecated at this time; it is used as the fallback when using the `DefaultErrorHandler` and some exception other than `BatchListenerFailedException` is thrown by the listener. + +There is no guarantee that, when a batch is redelivered, the batch has the same number of records and/or the redelivered records are in the same order. It is impossible, therefore, to maintain retry state for a batch. -The `RetryingBatchErrorHandler` takes a different approach. -If a batch listener throws an exception, and this error handler is configured, the retries are performed from the in-memory batch of records. -In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again. +The `RetryingBatchErrorHandler` takes a the following approach. +If a batch listener throws an exception, and this error handler is configured (or used as the fallback described above), the retries are performed from the in-memory batch of records. +In order to avoid a rebalance during an extended retry sequence, the error handler pauses the consumer, polls it before sleeping for the back off, for each retry, and calls the listener again. If/when retries are exhausted, the `ConsumerRecordRecoverer` is called for each record in the batch. If the recoverer throws an exception, or the thread is interrupted during its sleep, a `SeekToCurrentErrorHandler` is invoked so that the batch of records will be redelivered on the next poll. Before exiting, regardless of the outcome, the consumer is resumed. IMPORTANT: This error handler cannot be used with transactions. -Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay. - -Also see <>. - -[[recovering-batch-eh]] -===== Recovering Batch Error Handler +While waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay. -As an alternative to the <>, version 2.5 introduced the `RecoveringBatchErrorHandler`. - -This is now the default error handler for batch listeners. -The default configuration retries 9 times (10 delivery attempts) with no back off between deliveries. - -This error handler works in conjunction with the listener throwing a `BatchListenerFailedException` providing the index in the batch where the failure occurred (or the failed record itself). -If the listener throws a different exception, or the index is out of range, the error handler falls back to invoking a `SeekToCurrentBatchErrorHandler` and the whole batch is retried, with no recovery available. -The sequence of events is: - -* Commit the offsets of the records before the index. -* If retries are not exhausted, perform seeks so that all the remaining records (including the failed record) will be redelivered. -* If retries are exhausted, attempt recovery of the failed record (default log only) and perform seeks so that the remaining records (excluding the failed record) will be redelivered. -The recovered record's offset is committed -* If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted. - -The default recoverer logs the failed record after retries are exhausted. -You can use a custom recoverer, or one provided by the framework such as the <>. - -In all cases, a `BackOff` can be configured to enable a delay between delivery attempts. - -Example: - -==== -[source, java] ----- -@Bean -public RecoveringBatchErrorHandler batchErrorHandler(KafkaTemplate template) { - DeadLetterPublishingRecoverer recoverer = - new DeadLetterPublishingRecoverer(template); - RecoveringBatchErrorHandler errorHandler = - new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(2L, 5000)); -} ----- -==== - -==== -[source, java] ----- -@KafkaListener(id = "recovering", topics = "someTopic") -public void listen(List> records) { - records.forEach(record -> { - try { - process(record); - } - catch (Exception e) { - throw new BatchListenerFailedException("Failed to process", record); - } - }); -} ----- -==== - -For example; say 10 records are in the original batch and no more records are added to the topic during the retries, and the failed record is at index `4` in the list. -After the first delivery fails, the offsets for the first 4 records will be committed; the remaing 6 will be redelivered after 5 seconds. -Most likely (but not necessarily) the failed record will be at index `0` in the redelivery. -If it fails again, it will be retried one more time and, if it again fails, it will be sent to a dead letter topic. - -When using a POJO batch listener (e.g. `List`), and you don't have the full consumer record to add to the exception, you can just add the index of the record that failed: +===== Container Stopping Error Handlers -==== -[source, java] ----- -@KafkaListener(id = "recovering", topics = "someTopic") -public void listen(List things) { - for (int i = 0; i < records.size(); i++) { - try { - process(things.get(i)); - } - catch (Exception e) { - throw new BatchListenerFailedException("Failed to process", i); - } - } -} ----- -==== +The `CommonContainerStoppingErrorHandler` stops the container if the listener throws an exception. +For record listeners, when the `AckMode` is `RECORD`, offsets for already processed records are committed. +For record listeners, when the `AckMode` is any manual value, offsets for already acknowledged records are committed. +For record listeners, wWhen the `AckMode` is `BATCH`, or for batch listeners, the entire batch is replayed when the container is restarted. -IMPORTANT: This error handler cannot be used with transactions +After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown. +This is to cause the transaction to roll back (if transactions are enabled). -IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks. -Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again. -With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure. -To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`. +[[cond-eh]] +===== Delegating Error Handler -Starting with version 2.6, you can now provide the error handler with a `BiFunction, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception: +The `CommonDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type. +For example, you may wish to invoke a `DefaultErrorHandler` for most exceptions, or a `CommonContainerStoppingErrorHandler` for others. -==== -[source, java] ----- -handler.setBackOffFunction((record, ex) -> { ... }); ----- -==== +[[log-eh]] +===== Logging Error Handler -If the function returns `null`, the handler's default `BackOff` will be used. +The `CommonLoggingErrorHandler` simply logs the exception; with a record listener, the remaining records from the previous poll are passed to the listener. +For a batch listener, all the records in the batch are logged. -Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures. -By default, the exception type is not considered. +[[mixed-eh]] +===== Using Different Common Error Handlers for Record and Batch Listeners -Starting with version 2.7, while waiting for a `BackOff` interval, the error handler will loop with a short sleep until the desired delay is reached, while checking to see if the container has been stopped, allowing the sleep to exit soon after the `stop()` rather than causing a delay. +If you wish to use a different error handling strategy for record and batch listeners, the `CommonMixedErrorHandler` is provided allowing the configuration of a specific error handler for each listener type. -Starting with version 2.7, the error handler can be configured with one or more `RetryListener` s, receiving notifications of retry and recovery progress. +[[eh-summary]] +===== Common Error Handler Summery -==== -[source, java] ----- -@FunctionalInterface -public interface RetryListener { +* `DefaultErrorHandler` +* `CommonContainerStoppingErrorHandler` +* `CommonDelegatingErrorHandler` +* `CommonLoggingErrorHandler` +* `CommonMixedErrorHandler` - void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt); +[[legacy-eh]] +===== Legacy Error Handlers and Their Replacements - default void recovered(ConsumerRecord record, Exception ex) { - } +[cols="16,16" options="header"] +|=== +|Legacy Error Handler +|Replacement - default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) { - } +|`LoggingErrorHandler` +|`CommonLoggingErrorHandler` -} ----- -==== +|`BatchLoggingErrorHandler` +|`CommonLoggingErrorHandler` -See the javadocs for more information. +|`ConditionalDelegatingErrorHandler` +|`DelegatingErrorHandler` -===== Container Stopping Error Handlers +|`ConditionalDelegatingBatchErrorHandler` +|`DelegatingErrorHandler` -The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception. -When the `AckMode` is `RECORD`, offsets for already processed records are committed. -When the `AckMode` is any manual value, offsets for already acknowledged records are committed. -When the `AckMode` is `BATCH`, the entire batch is replayed when the container is restarted (unless transactions are enabled -- in which case, only the unprocessed records are re-fetched). +|`ContainerStoppingErrorHandler` +|`CommonContainerStoppingErrorHandler` -The `ContainerStoppingBatchErrorHandler` (used with batch listeners) stops the container, and the entire batch is replayed when the container is restarted. +|`ContainerStoppingBatchErrorHandler` +|`CommonContainerStoppingErrorHandler` -After the container stops, an exception that wraps the `ListenerExecutionFailedException` is thrown. -This is to cause the transaction to roll back (if transactions are enabled). +|`SeekToCurrentErrorHandler` +|`DefaultErrorHandler` -[[cond-eh]] -===== Conditional Delegating Error Handlers +|`SeekToCurrentBatchErrorHandler` +|No replacement, use `DefaultErrorHandler` with an infinite `BackOff`. -Introduced in version 2.7.4, the `ConditionalDelegatingErrorHandler` can delegate to different error handlers, depending on the exception type. -For example, you may wish to invoke a `SeekToCurrentErrorHandler` for most exceptions, or a `ContainerStoppingErrorHandler` for others. +|`RecoveringBatchErrorHandler` +|`DefaultErrorHandler` -Similarly, the `ConditionalDelegatingBatchErrorHandler` is provided. +|`RetryingBatchErrorHandler` +|No replacements - use `DefaultErrorHandler` and throw an exception other than `BatchListenerFailedException`. +|=== [[after-rollback]] ===== After-rollback Processor @@ -5297,8 +5165,8 @@ AfterRollbackProcessor processor = ---- ==== -When you do not use transactions, you can achieve similar functionality by configuring a `SeekToCurrentErrorHandler`. -See <>. +When you do not use transactions, you can achieve similar functionality by configuring a `DefaultErrorHandler`. +See <>. IMPORTANT: Recovery is not possible with a batch listener, since the framework has no knowledge about which record in the batch keeps failing. In such cases, the application listener must handle a record that keeps failing. @@ -5328,7 +5196,7 @@ If the function returns `null`, the processor's default `BackOff` will be used. Starting with version 2.6.3, set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures. By default, the exception type is not considered. -Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. +Starting with version 2.3.1, similar to the `DefaultErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure. The exceptions that are considered fatal, by default, are: * `DeserializationException` @@ -5413,12 +5281,12 @@ When using `@KafkaListener` with the `DefaultKafkaHeaderMapper` or `SimpleKafkaH To enable population of this header, set the container property `deliveryAttemptHeader` to `true`. It is disabled by default to avoid the (small) overhead of looking up the state for each record and adding the header. -The `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor` support this feature. +The `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` support this feature. [[dead-letters]] ===== Publishing Dead-letter Records -As <>, you can configure the `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor` (as well as the `RecoveringBatchErrorHandler`) with a record recoverer when the maximum number of failures is reached for a record. +You can configure the `DefaultErrorHandler` and `DefaultAfterRollbackProcessor` with a record recoverer when the maximum number of failures is reached for a record. The framework provides the `DeadLetterPublishingRecoverer`, which publishes the failed message to another topic. The recoverer requires a `KafkaTemplate`, which is used to send the record. You can also, optionally, configure it with a `BiFunction, Exception, TopicPartition>`, which is called to resolve the destination topic and partition. @@ -5444,7 +5312,7 @@ DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(temp return new TopicPartition(r.topic() + ".other.failures", r.partition()); } }); -ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L)); +ErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L)); ---- ==== @@ -5522,7 +5390,7 @@ The following error handler configuration will do exactly that: ---- @Bean public ErrorHandler eh(KafkaOperations template) { - return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template, + return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template, (rec, ex) -> { org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries"); if (retries == null) { @@ -5555,12 +5423,12 @@ Since version 2.7.3, Spring for Apache Kafka provides the `ExponentialBackOffWit [source, java] ---- @Bean -SeekToCurrentErrorHandler handler() { +DefaultErrorHandler handler() { ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6); bo.setInitialInterval(1_000L); bo.setMultiplier(2.0); bo.setMaxInterval(10_000L); - return new SeekToCurrentErrorHandler(myRecoverer, bo); + return new DefaultErrorHandler(myRecoverer, bo); } ---- ==== diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index cad72ee38c..16b72db467 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -20,3 +20,9 @@ See <> for more information. You can now receive a single record, given the topic, partition and offset. See <> for more information. + +[[x28-eh]] +==== `CommonErrorHandler` Added + +The legacy `GenericErrorHandler` and its sub-interface hierarchies for record an batch listeners have been replaced by a new single interface `CommonErrorHandler` with implementations corresponding to most legacy implementations of `GenericErrorHandler`. +See <> for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java index e0b12ffc6b..152b60cb1d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java @@ -116,6 +116,16 @@ public void setAckAfterHandle(boolean ackAfterHandle) { this.ackAfterHandle = ackAfterHandle; } + @Override + public boolean remainingRecords() { + return true; + } + + @Override + public boolean deliveryAttemptHeader() { + return true; + } + @Override public void handleRemaining(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index ed90a53745..06ff609598 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -810,7 +810,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan } if (this.isBatchListener) { validateErrorHandler(true); - BatchErrorHandler batchErrorHandler = determineBatchErrorHandler(errHandler); + BatchErrorHandler batchErrorHandler = (BatchErrorHandler) errHandler; if (batchErrorHandler != null) { return new ErrorHandlerAdapter(batchErrorHandler); } @@ -820,7 +820,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan } else { validateErrorHandler(false); - ErrorHandler eh = determineErrorHandler(errHandler); + ErrorHandler eh = (ErrorHandler) errHandler; if (eh != null) { return new ErrorHandlerAdapter(eh); } @@ -1120,18 +1120,6 @@ protected void checkConsumer() { } } - @Nullable - protected BatchErrorHandler determineBatchErrorHandler(@Nullable GenericErrorHandler errHandler) { - return errHandler != null ? (BatchErrorHandler) errHandler - : this.transactionManager != null ? null : new RecoveringBatchErrorHandler(); - } - - @Nullable - protected ErrorHandler determineErrorHandler(@Nullable GenericErrorHandler errHandler) { - return errHandler != null ? (ErrorHandler) errHandler - : this.transactionManager != null ? null : new SeekToCurrentErrorHandler(); - } - @Nullable private MicrometerHolder obtainMicrometerHolder() { MicrometerHolder holder = null; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java index eb6b704600..e383d963af 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java @@ -35,11 +35,14 @@ * exception, error handling is delegated to a {@link SeekToCurrentBatchErrorHandler} with * this handler's {@link BackOff}. If the record is recovered, its offset is committed. * + * @deprecated in favor of {@link DefaultErrorHandler}. + * * @author Gary Russell * @author Myeonghyeon Lee * @since 2.5 * */ +@Deprecated public class RecoveringBatchErrorHandler extends FailedBatchProcessor implements ContainerAwareBatchErrorHandler { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java index 13030dc624..239402741d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java @@ -48,6 +48,7 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware private final BiConsumer, Exception> recoverer; + @SuppressWarnings("deprecation") private final CommonErrorHandler seeker = new ErrorHandlerAdapter(new SeekToCurrentBatchErrorHandler()); private boolean ackAfterHandle = true; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java index 2efc49412f..9194307d1b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java @@ -31,10 +31,14 @@ * An error handler that seeks to the current offset for each topic in a batch of records. * Used to rewind partitions after a message failure so that the batch can be replayed. * + * @deprecated with no replacement - use {@link DefaultErrorHandler} with an infinite + * {@link BackOff}. + * * @author Gary Russell * @since 2.1 * */ +@Deprecated public class SeekToCurrentBatchErrorHandler extends KafkaExceptionLogLevelAware implements ContainerAwareBatchErrorHandler { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java index 09382d6df1..fdaff75bd5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java @@ -30,12 +30,15 @@ * records. Used to rewind partitions after a message failure so that it can be * replayed. * + * @deprecated in favor of {@link DefaultErrorHandler}. + * * @author Gary Russell * @author Artem Bilan * * @since 2.0.1 * */ +@Deprecated public class SeekToCurrentErrorHandler extends FailedRecordProcessor implements ContainerAwareErrorHandler { private boolean ackAfterHandle = true; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index d4c2aa6cb4..5d4953c427 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -30,12 +30,12 @@ import org.springframework.core.log.LogAccessor; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.util.Assert; import org.springframework.util.backoff.FixedBackOff; @@ -43,7 +43,7 @@ /** * * Configures the provided {@link ConcurrentKafkaListenerContainerFactory} with a - * {@link SeekToCurrentErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by + * {@link DefaultErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by * the {@link DeadLetterPublishingRecovererFactory}. * * Mind that the same factory can be used by many different @@ -76,7 +76,7 @@ public class ListenerContainerFactoryConfigurer { private Consumer> containerCustomizer = container -> { }; - private Consumer errorHandlerCustomizer = errorHandler -> { + private Consumer errorHandlerCustomizer = errorHandler -> { }; private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory; @@ -108,12 +108,13 @@ public class ListenerContainerFactoryConfigurer { : doConfigure(containerFactory, Collections.emptyList()); } - private ConcurrentKafkaListenerContainerFactory doConfigure(ConcurrentKafkaListenerContainerFactory containerFactory, - List backOffValues) { - containerFactory.setContainerCustomizer(container -> - setupBackoffAwareMessageListenerAdapter(container, backOffValues)); + private ConcurrentKafkaListenerContainerFactory doConfigure( + ConcurrentKafkaListenerContainerFactory containerFactory, List backOffValues) { + + containerFactory + .setContainerCustomizer(container -> setupBackoffAwareMessageListenerAdapter(container, backOffValues)); containerFactory - .setErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create())); + .setCommonErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create())); return containerFactory; } @@ -135,12 +136,12 @@ public void setContainerCustomizer(Consumer errorHandlerCustomizer) { + public void setErrorHandlerCustomizer(Consumer errorHandlerCustomizer) { this.errorHandlerCustomizer = errorHandlerCustomizer; } - private ErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { - SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer, + private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { + DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 0)); errorHandler.setCommitRecovered(true); this.errorHandlerCustomizer.accept(errorHandler); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java index f5d7b3202e..6af6f36f10 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/StatefulRetryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2020 the original author or authors. + * Copyright 2018-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,8 +37,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; @@ -88,18 +88,18 @@ public KafkaListenerContainerFactory kafkaListenerContainerFactory(EmbeddedKa ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory(embeddedKafka)); - SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler() { + DefaultErrorHandler errorHandler = new DefaultErrorHandler() { @Override - public void handle(Exception thrownException, List> records, + public void handleRemaining(Exception thrownException, List> records, Consumer consumer, MessageListenerContainer container) { Config.this.seekPerformed = true; - super.handle(thrownException, records, consumer, container); + super.handleRemaining(thrownException, records, consumer, container); } }; errorHandler.setLogLevel(Level.INFO); - factory.setErrorHandler(errorHandler); + factory.setCommonErrorHandler(errorHandler); factory.setStatefulRetry(true); factory.setRetryTemplate(new RetryTemplate()); factory.setRecoveryCallback(c -> { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index f1f8507a51..859419cbf1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -917,8 +917,8 @@ public void onMessage(ConsumerRecord data) { containerProps.setClientId("clientId"); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); - SeekToCurrentErrorHandler errorHandler = spy(new SeekToCurrentErrorHandler(new FixedBackOff(0L, 0))); - container.setErrorHandler(errorHandler); + DefaultErrorHandler errorHandler = spy(new DefaultErrorHandler(new FixedBackOff(0L, 0))); + container.setCommonErrorHandler(errorHandler); container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue(); @@ -927,7 +927,7 @@ public void onMessage(ConsumerRecord data) { inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class)); inOrder.verify(consumer).commitSync(anyMap(), any()); inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class)); - inOrder.verify(errorHandler).handle(any(), any(), any(), any()); + inOrder.verify(errorHandler).handleRemaining(any(), any(), any(), any()); inOrder.verify(consumer).commitSync(anyMap(), any()); container.stop(); } @@ -2879,7 +2879,6 @@ public void onPartitionsAssigned(Collection partitions) { KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testContainerException"); - container.setErrorHandler(new SeekToCurrentErrorHandler()); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); template.sendDefault(0, 0, "a"); @@ -3438,7 +3437,7 @@ void commitAfterHandleManual() throws InterruptedException { new KafkaMessageListenerContainer<>(cf, containerProps); AtomicBoolean recovered = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); - container.setErrorHandler(new SeekToCurrentErrorHandler((rec, ex) -> { + container.setCommonErrorHandler(new DefaultErrorHandler((rec, ex) -> { recovered.set(true); latch.countDown(); }, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java index 4b1f4815d1..add66d5f06 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualAssignmentInitialSeekTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -200,7 +200,6 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.RECORD); factory.getContainerProperties().setDeliveryAttemptHeader(true); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java index 60aa22c831..309eed77fa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -209,7 +209,6 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.MANUAL); factory.getContainerProperties().setMissingTopicsFatal(false); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java index 2da651cdd3..5aa7e74df3 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -74,6 +74,7 @@ */ @SpringJUnitConfig @DirtiesContext +@SuppressWarnings("deprecation") public class ManualNackRecordTxTests { @SuppressWarnings("rawtypes") diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java index f325a27016..76aadada5e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java @@ -48,6 +48,7 @@ * @since 2.5 * */ +@SuppressWarnings("deprecation") @EmbeddedKafka(topics = { RecoveringBatchErrorHandlerIntegrationTests.topic1, RecoveringBatchErrorHandlerIntegrationTests.topic1DLT, diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerTests.java index b76d2a7cc5..79b2d99b57 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerTests.java @@ -70,6 +70,7 @@ */ @SpringJUnitConfig @DirtiesContext +@SuppressWarnings("deprecation") public class RecoveringBatchErrorHandlerTests { private static final String CONTAINER_ID = "container"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java index 0595ad709c..3cf8f832dc 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,7 @@ */ @SpringJUnitConfig @DirtiesContext +@SuppressWarnings("deprecation") public class SeekToCurrentBatchErrorHandlerTests { private static final String CONTAINER_ID = "container"; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java index 9bdb950e55..323b8d0beb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java @@ -48,6 +48,7 @@ * @since 2.3 * */ +@SuppressWarnings("deprecation") public class SeekToCurrentErrorHandlerTests { @Test diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java index 99b3668438..e2c6620dde 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -236,7 +236,7 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); + factory.setCommonErrorHandler(new DefaultErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.BATCH); factory.getContainerProperties().setTransactionManager(tm()); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTests.java index a59c847dc2..df1b7c247d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorBatchModeTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -206,7 +206,6 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.BATCH); return factory; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java index b0ef7238cc..bb5c0da2a8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTXTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -237,7 +237,7 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); + factory.setCommonErrorHandler(new DefaultErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.RECORD); factory.getContainerProperties().setTransactionManager(tm()); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java index 70d8055be9..e1ee90b091 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentOnErrorRecordModeTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -223,7 +223,6 @@ public Consumer consumer() { public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); - factory.setErrorHandler(new SeekToCurrentErrorHandler()); factory.getContainerProperties().setAckMode(AckMode.RECORD); factory.getContainerProperties().setDeliveryAttemptHeader(true); return factory; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java index 5c1c5ccf25..bb474f0031 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -41,7 +40,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -137,8 +135,8 @@ public void accept(ConsumerRecord record, @Nullable Consumer consume } }; - SeekToCurrentErrorHandler errorHandler = spy(new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2))); - container.setErrorHandler(errorHandler); + DefaultErrorHandler errorHandler = spy(new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2))); + container.setCommonErrorHandler(errorHandler); final CountDownLatch stopLatch = new CountDownLatch(1); container.setApplicationEventPublisher(e -> { if (e instanceof ConsumerStoppedEvent) { @@ -169,24 +167,24 @@ public void accept(ConsumerRecord record, @Nullable Consumer consume dltPf.destroy(); consumer.close(); assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue(); - verify(errorHandler, times(4)).handle(any(), any(), any(), any()); + verify(errorHandler, times(4)).handleRemaining(any(), any(), any(), any()); verify(errorHandler).clearThreadState(); } @Test public void seekToCurrentErrorHandlerRecovers() { @SuppressWarnings("unchecked") - BiConsumer, Exception> recoverer = mock(BiConsumer.class); - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); + DefaultErrorHandler eh = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 1)); List> records = new ArrayList<>(); records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> - eh.handle(new RuntimeException(), records, consumer, null)); + eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); - eh.handle(new RuntimeException(), records, consumer, null); + eh.handleRemaining(new RuntimeException(), records, consumer, null); verify(consumer).seek(new TopicPartition("foo", 0), 1L); verifyNoMoreInteractions(consumer); verify(recoverer).accept(eq(records.get(0)), any()); @@ -195,7 +193,7 @@ public void seekToCurrentErrorHandlerRecovers() { @Test public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() { @SuppressWarnings("unchecked") - BiConsumer, Exception> recoverer = mock(BiConsumer.class); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); AtomicBoolean fail = new AtomicBoolean(true); willAnswer(incovation -> { if (fail.getAndSet(false)) { @@ -203,7 +201,7 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() { } return null; }).given(recoverer).accept(any(), any()); - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + DefaultErrorHandler eh = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 1)); AtomicInteger failedDeliveryAttempt = new AtomicInteger(); AtomicReference recoveryFailureEx = new AtomicReference<>(); AtomicBoolean isRecovered = new AtomicBoolean(); @@ -230,16 +228,16 @@ public void recoveryFailed(ConsumerRecord record, Exception original, Exce records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); assertThatExceptionOfType(KafkaException.class).isThrownBy( - () -> eh.handle(new RuntimeException(), records, consumer, null)); + () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); assertThatExceptionOfType(KafkaException.class).isThrownBy( - () -> eh.handle(new RuntimeException(), records, consumer, null)); + () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); assertThatExceptionOfType(KafkaException.class).isThrownBy( - () -> eh.handle(new RuntimeException(), records, consumer, null)); + () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); - eh.handle(new RuntimeException(), records, consumer, null); + eh.handleRemaining(new RuntimeException(), records, consumer, null); verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L); verify(consumer).seek(new TopicPartition("foo", 0), 1L); verifyNoMoreInteractions(consumer); @@ -255,7 +253,7 @@ public void recoveryFailed(ConsumerRecord record, Exception original, Exce @Test public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() { @SuppressWarnings("unchecked") - BiConsumer, Exception> recoverer = mock(BiConsumer.class); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); AtomicBoolean fail = new AtomicBoolean(true); willAnswer(incovation -> { if (fail.getAndSet(false)) { @@ -263,20 +261,20 @@ public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() { } return null; }).given(recoverer).accept(any(), any()); - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + DefaultErrorHandler eh = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 1)); eh.setResetStateOnRecoveryFailure(false); List> records = new ArrayList<>(); records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); assertThatExceptionOfType(KafkaException.class).isThrownBy( - () -> eh.handle(new RuntimeException(), records, consumer, null)); + () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); assertThatExceptionOfType(KafkaException.class).isThrownBy( - () -> eh.handle(new RuntimeException(), records, consumer, null)); + () -> eh.handleRemaining(new RuntimeException(), records, consumer, null)); verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); - eh.handle(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery + eh.handleRemaining(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L); verify(consumer).seek(new TopicPartition("foo", 0), 1L); verifyNoMoreInteractions(consumer); @@ -295,8 +293,8 @@ public void seekToCurrentErrorHandlerRecoversManualAcksSync() { private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) { @SuppressWarnings("unchecked") - BiConsumer, Exception> recoverer = mock(BiConsumer.class); - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1)); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); + DefaultErrorHandler eh = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 1)); eh.setCommitRecovered(true); List> records = new ArrayList<>(); records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); @@ -310,17 +308,12 @@ private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) { OffsetCommitCallback commitCallback = (offsets, ex) -> { }; properties.setCommitCallback(commitCallback); given(container.getContainerProperties()).willReturn(properties); - try { - eh.handle(new RuntimeException(), records, consumer, container); - fail("Expected exception"); - } - catch (@SuppressWarnings("unused") KafkaException e) { - // NOSONAR - } + assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + eh.handleRemaining(new RuntimeException(), records, consumer, container)); verify(consumer).seek(new TopicPartition("foo", 0), 0L); verify(consumer).seek(new TopicPartition("foo", 1), 0L); verifyNoMoreInteractions(consumer); - eh.handle(new RuntimeException(), records, consumer, container); + eh.handleRemaining(new RuntimeException(), records, consumer, container); verify(consumer, times(2)).seek(new TopicPartition("foo", 1), 0L); if (syncCommits) { verify(consumer) @@ -340,20 +333,15 @@ private void seekToCurrentErrorHandlerRecoversManualAcks(boolean syncCommits) { @Test public void testNeverRecover() { @SuppressWarnings("unchecked") - BiConsumer, Exception> recoverer = mock(BiConsumer.class); - SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, Long.MAX_VALUE)); + ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class); + DefaultErrorHandler eh = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, Long.MAX_VALUE)); List> records = new ArrayList<>(); records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo")); records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar")); Consumer consumer = mock(Consumer.class); for (int i = 0; i < 20; i++) { - try { - eh.handle(new RuntimeException(), records, consumer, null); - fail("Expected exception"); - } - catch (@SuppressWarnings("unused") KafkaException e) { - // NOSONAR - } + assertThatExceptionOfType(KafkaException.class).isThrownBy(() -> + eh.handleRemaining(new RuntimeException(), records, consumer, null)); } verify(consumer, times(20)).seek(new TopicPartition("foo", 0), 0L); verifyNoMoreInteractions(consumer); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java index 1dce45b7bc..e3aa8e23ae 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java @@ -47,12 +47,12 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.ContainerCustomizer; import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; +import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; -import org.springframework.kafka.listener.ErrorHandler; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.listener.KafkaConsumerBackoffManager; -import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.listener.adapter.AbstractDelegatingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter; import org.springframework.kafka.support.Acknowledgment; @@ -78,7 +78,7 @@ class ListenerContainerFactoryConfigurerTests { private ContainerProperties containerProperties; @Captor - private ArgumentCaptor errorHandlerCaptor; + private ArgumentCaptor errorHandlerCaptor; private final ConsumerRecord record = new ConsumerRecord<>("test-topic", 1, 1234L, new Object(), new Object()); @@ -95,7 +95,7 @@ class ListenerContainerFactoryConfigurerTests { private OffsetCommitCallback offsetCommitCallback; @Mock - private java.util.function.Consumer errorHandlerCustomizer; + private java.util.function.Consumer errorHandlerCustomizer; @SuppressWarnings("rawtypes") @Captor @@ -134,7 +134,7 @@ class ListenerContainerFactoryConfigurerTests { private final long backOffValue = 2000L; - private ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration = + private final ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration = new ListenerContainerFactoryConfigurer.Configuration(Collections.singletonList(backOffValue)); @Test @@ -156,13 +156,13 @@ void shouldSetupErrorHandling() { .configure(containerFactory, configuration.forContainerFactoryConfigurer()); // then - then(containerFactory).should(times(1)).setErrorHandler(errorHandlerCaptor.capture()); - ErrorHandler errorHandler = errorHandlerCaptor.getValue(); - assertThat(SeekToCurrentErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue(); - SeekToCurrentErrorHandler seekToCurrent = (SeekToCurrentErrorHandler) errorHandler; + then(containerFactory).should(times(1)).setCommonErrorHandler(errorHandlerCaptor.capture()); + CommonErrorHandler errorHandler = errorHandlerCaptor.getValue(); + assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue(); + DefaultErrorHandler seekToCurrent = (DefaultErrorHandler) errorHandler; RuntimeException ex = new RuntimeException(); - seekToCurrent.handle(ex, records, consumer, container); + seekToCurrent.handleRemaining(ex, records, consumer, container); then(recoverer).should(times(1)).accept(record, consumer, ex); then(consumer).should(times(1)).commitAsync(any(Map.class), eq(offsetCommitCallback)); @@ -379,6 +379,6 @@ void shouldCacheFactoryInstances() { // then assertThat(secondFactory).isEqualTo(factory); then(containerFactory).should(times(1)).setContainerCustomizer(any()); - then(containerFactory).should(times(1)).setErrorHandler(any()); + then(containerFactory).should(times(1)).setCommonErrorHandler(any()); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java index aa49193b7c..4201799b6c 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/streams/KafkaStreamsTests.java @@ -59,7 +59,7 @@ import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; -import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer; +import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -211,9 +211,8 @@ public AtomicBoolean stateChangeCalled() { return new AtomicBoolean(); } - @SuppressWarnings("deprecation") @Bean - public StreamsBuilderFactoryBeanCustomizer customizer() { + public StreamsBuilderFactoryBeanConfigurer customizer() { return fb -> fb.setStateListener((newState, oldState) -> { stateChangeCalled().set(true); });