Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5098,6 +5098,11 @@ If you are using Spring Boot, you simply need to add the error handler as a `@Be
This new error handler replaces the `SeekToCurrentErrorHandler` and `RecoveringBatchErrorHandler`, which have been the default error handlers for several releases now.
One difference is that the fallback behavior for batch listeners (when an exception other than a `BatchListenerFailedException` is thrown) is the equivalent of the <<retrying-batch-eh>>.

IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed below, but without actually seeking.
Instead, the records are retained by the listener container and resubmitted to the listener after the error handler exits (and after performing a single paused `poll()`, to keep the consumer alive).
The error handler returns a result to the container that indicates whether the current failing record can be resubmitted, or if it was recovered and then it will not be sent to the listener again.
To enable this mode, set the property `seekAfterError` to `false`.

The error handler can recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the `ERROR` level).
You can configure the handler with a custom recoverer (`BiConsumer`) and a `BackOff` that controls the delivery attempts and delays between each.
Expand Down Expand Up @@ -5152,6 +5157,11 @@ The sequence of events is:
The recovered record's offset is committed
* If retries are exhausted and recovery fails, seeks are performed as if retries are not exhausted.

IMPORTANT: Starting with version 2.9, the `DefaultErrorHandler` can be configured to provide the same semantics as seeking the unprocessed record offsets as discussed above, but without actually seeking.
Instead, error handler creates a new `ConsumerRecords<?, ?>` containing just the unprocessed records which will then be submitted to the listener (after performing a single paused `poll()`, to keep the consumer alive).
To enable this mode, set the property `seekAfterError` to `false`.


The default recoverer logs the failed record after retries are exhausted.
You can use a custom recoverer, or one provided by the framework such as the <<dead-letters,`DeadLetterPublishingRecoverer`>>.

Expand Down Expand Up @@ -5250,6 +5260,8 @@ If the function returns `null`, the handler's default `BackOff` will be used.
Set `resetStateOnExceptionChange` to `true` and the retry sequence will be restarted (including the selection of a new `BackOff`, if so configured) if the exception type changes between failures.
By default, the exception type is not considered.

Starting with version 2.9, this is now `true` by default.

Also see <<delivery-header>>.

[[batch-listener-conv-errors]]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,10 +71,17 @@ public void setStopContainerAbnormally(boolean stopContainerAbnormally) {
}

@Override
@Deprecated
public boolean remainingRecords() {
return true;
}

@Override
public boolean seeksAfterHandling() {
// We don't actually do any seeks here, but stopping the container has the same effect.
return true;
}

@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
MessageListenerContainer container, boolean batchListener) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,11 +65,17 @@ public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler>
}


@SuppressWarnings("deprecation")
@Override
public boolean remainingRecords() {
return this.defaultErrorHandler.remainingRecords();
}

@Override
public boolean seeksAfterHandling() {
return this.defaultErrorHandler.seeksAfterHandling();
}

@Override
public void clearThreadState() {
this.defaultErrorHandler.clearThreadState();
Expand All @@ -96,14 +102,18 @@ public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler
checkDelegates();
}

@SuppressWarnings("deprecation")
private void checkDelegates() {
boolean remainingRecords = this.defaultErrorHandler.remainingRecords();
boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling();
this.delegates.values().forEach(handler -> {
Assert.isTrue(remainingRecords == handler.remainingRecords(),
"All delegates must return the same value when calling 'remainingRecords()'");
Assert.isTrue(ackAfterHandle == handler.isAckAfterHandle(),
"All delegates must return the same value when calling 'isAckAfterHandle()'");
Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(),
"All delegates must return the same value when calling 'seeksAfterHandling()'");
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,13 +41,25 @@ public interface CommonErrorHandler extends DeliveryAttemptAware {
* When true (default), all remaining records including the failed record are passed
* to the error handler.
* @return false to receive only the failed record.
* @deprecated in favor of {@link #seeksAfterHandling()}.
* @see #handleRecord(Exception, ConsumerRecord, Consumer, MessageListenerContainer)
* @see #handleRemaining(Exception, List, Consumer, MessageListenerContainer)
*/
@Deprecated
default boolean remainingRecords() {
return false;
}

/**
* Return true if this error handler performs seeks on the failed record and remaining
* records (or just the remaining records after a failed record is recovered).
* @return true if the next poll should fetch records.
*/
@SuppressWarnings("deprecation")
default boolean seeksAfterHandling() {
return remainingRecords();
}

/**
* Return true if this error handler supports delivery attempts headers.
* @return true if capable.
Expand Down Expand Up @@ -79,14 +91,42 @@ default void handleOtherException(Exception thrownException, Consumer<?, ?> cons
* @param record the record.
* @param consumer the consumer.
* @param container the container.
* @deprecated in favor of
* {@link #handleOne(Exception, ConsumerRecord, Consumer, MessageListenerContainer)}.
* @see #remainingRecords()
*/
@Deprecated
default void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
}

/**
* Handle the exception for a record listener when {@link #remainingRecords()} returns
* false. Use this to handle just the single failed record.
* @param thrownException the exception.
* @param record the record.
* @param consumer the consumer.
* @param container the container.
* @return true if the error was "handled" or false if not and the container will
* re-submit the record to the listener.
* @since 2.9
* @see #remainingRecords()
*/
@SuppressWarnings("deprecation")
default boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

try {
handleRecord(thrownException, record, consumer, container);
return true;
}
catch (Exception ex) {
return false;
}
}

/**
* Handle the exception for a record listener when {@link #remainingRecords()} returns
* true. The failed record and all the remaining records from the poll are passed in.
Expand Down Expand Up @@ -120,6 +160,28 @@ default void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data,
LogFactory.getLog(getClass()).error("'handleBatch' is not implemented by this handler", thrownException);
}

/**
* Handle the exception for a batch listener. The complete {@link ConsumerRecords}
* from the poll is supplied. Return the members of the batch that should be re-sent to
* the listener. The returned records MUST be in the same order as the original records.
* @param thrownException the exception.
* @param data the consumer records.
* @param consumer the consumer.
* @param container the container.
* @param invokeListener a callback to re-invoke the listener.
* @param <K> the key type.
* @param <V> the value type.
* @return the consumer records, or a subset.
* @since 2.9
*/
default <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
Runnable invokeListener) {

handleBatch(thrownException, data, consumer, container, invokeListener);
return ConsumerRecords.empty();
}

@Override
default int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
}

@Override
@Deprecated
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,11 +51,17 @@ public CommonMixedErrorHandler(CommonErrorHandler recordErrorHandler, CommonErro
this.batchErrorHandler = batchErrorHandler;
}

@SuppressWarnings("deprecation")
@Override
public boolean remainingRecords() {
return this.recordErrorHandler.remainingRecords();
}

@Override
public boolean seeksAfterHandling() {
return this.recordErrorHandler.seeksAfterHandling();
}

@Override
public boolean deliveryAttemptHeader() {
return this.recordErrorHandler.deliveryAttemptHeader();
Expand All @@ -73,10 +79,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
}

@Override
public void handleRecord(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

this.recordErrorHandler.handleRecord(thrownException, record, consumer, container);
return this.recordErrorHandler.handleOne(thrownException, record, consumer, container);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ private void checkConfig() {
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {

if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
getRecoveryStrategy((List) records, exception), container, this.logger)
if (SeekUtils.doSeeks((List) records, consumer, exception, recoverable,
getFailureTracker()::recovered, container, this.logger)
&& isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,39 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
}

@Override
@Deprecated
public boolean remainingRecords() {
return true;
return isSeekAfterError();
}

@Override
public boolean seeksAfterHandling() {
return isSeekAfterError();
}

@Override
public boolean deliveryAttemptHeader() {
return true;
}

@Override
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
MessageListenerContainer container) {

try {
return getFailureTracker().recovered(record, thrownException, container, consumer);
}
catch (Exception ex) {
return false;
}
}

@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, consumer, thrownException), this.logger, getLogLevel());
getFailureTracker()::recovered, this.logger, getLogLevel());
}

@Override
Expand All @@ -141,6 +159,14 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
doHandle(thrownException, data, consumer, container, invokeListener);
}

@Override
public <K, V> ConsumerRecords<K, V> handleBatchAndReturnRemaining(Exception thrownException,
ConsumerRecords<?, ?> data, Consumer<?, ?> consumer, MessageListenerContainer container,
Runnable invokeListener) {

return handle(thrownException, data, consumer, container, invokeListener);
}

@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer,
MessageListenerContainer container, boolean batchListener) {
Expand Down
Loading