diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 76e44f1b3a..44c3ebcf2b 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -702,3 +702,25 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate } ---- ==== + +[[change-kboe-logging-level]] +==== Changing KafkaBackOffException Logging Level + +When a message in the retry topic is not due for consumption, a KafkaBackOffException is thrown. Such exception is logged by default at DEBUG level, but you can change this behavior by setting an error handler customizer in the ListenerContainerFactoryConfigurer in a @Configuration class. + +For example, to change the logging level to WARN you might add: + +==== +[source, java] +---- +@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME) +public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, + DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, + @Qualifier(RetryTopicInternalBeanNames + .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) { + ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock); + configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN)); + return configurer; +} +---- +==== diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index 1131d0c7d3..9b4513a245 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -78,3 +78,10 @@ See <> for more information. The property `stripPreviousExceptionHeaders` is now `true` by default. See <> for more information. + +[[x28-kafka-back-off-exception-log-level]] +==== KafkaBackOffException Log Level Changes + +The KafkaBackOffException thrown when using the retryable topics feature is now logged at DEBUG level. + +See <> if you need to change the logging level back to WARN or set it to any other level. \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index b25ad95109..1b62781d75 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2495,12 +2495,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord recor commitOffsetsIfNeeded(record); } catch (KafkaException ke) { - if (ke.contains(KafkaBackoffException.class)) { - this.logger.warn(ke.getMessage()); - } - else { - ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); - } + ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger); return ke; } catch (RuntimeException ee) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java index 5d4953c427..58ba2022ad 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.KafkaException; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener; import org.springframework.kafka.listener.CommonErrorHandler; @@ -144,6 +145,7 @@ private CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer dead DefaultErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 0)); errorHandler.setCommitRecovered(true); + errorHandler.setLogLevel(KafkaException.Level.DEBUG); this.errorHandlerCustomizer.accept(errorHandler); return errorHandler; }