Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.dsl.ComponentsRegistration;
import org.springframework.integration.dsl.MessagingGatewaySpec;
Expand All @@ -33,7 +34,6 @@
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.integration.dsl.ComponentsRegistration;
import org.springframework.integration.dsl.MessageProducerSpec;
Expand All @@ -34,7 +35,6 @@
import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,43 @@
import org.jspecify.annotations.Nullable;

import org.springframework.core.AttributeAccessor;
import org.springframework.core.AttributeAccessorSupport;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.core.RecoveryCallback;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;

/**
* Implementations of this interface will generally support a retry template for retrying
* incoming deliveries and this supports adding common attributes to the retry context.
* incoming deliveries, and this supports adding common attributes to the retry context.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 6.0
*
*/
public interface KafkaInboundEndpoint {

/**
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
* The {@link RetryContext} attribute key for an acknowledgment
* if the listener is capable of acknowledging.
*/
String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";

/**
* {@link org.springframework.retry.RetryContext} attribute key for the consumer if
* The {@link RetryContext} attribute key for the consumer if
* the listener is consumer-aware.
*/
String CONTEXT_CONSUMER = "consumer";

/**
* {@link org.springframework.retry.RetryContext} attribute key for the record.
* The {@link RetryContext} attribute key for the record.
*/
String CONTEXT_RECORD = "record";

ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
ThreadLocal<@Nullable AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();

/**
* Execute the runnable with the retry template and recovery callback.
Expand All @@ -67,24 +71,46 @@ public interface KafkaInboundEndpoint {
default void doWithRetry(RetryTemplate template, @Nullable RecoveryCallback<?> callback, ConsumerRecord<?, ?> record,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't doWithRetry be @Nullable since it can return a null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return does not apply to the logic of Kafka Inbound Endpoints.
Therefore void here and just swallowing null return from the RetryTemplate.execute().

@Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer, Runnable runnable) {

RetryContext context = new RetryContext();
context.setAttribute(CONTEXT_RECORD, record);
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
ATTRIBUTES_HOLDER.set(context);

try {
template.execute(context -> {
if (context.getRetryCount() == 0) {
context.setAttribute(CONTEXT_RECORD, record);
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
ATTRIBUTES_HOLDER.set(context);
template.execute(() -> {
try {
runnable.run();
}
catch (Throwable ex) {
context.retryCount++;
throw ex;
}
runnable.run();
return null;
}, callback);
});
}
catch (Exception ex) {
throw new KafkaException("Failed to execute runnable", ex);
catch (RetryException ex) {
if (callback != null) {
callback.recover(context, ex);
}
else {
throw new KafkaException("Failed to execute runnable", ex);
}
}
finally {
ATTRIBUTES_HOLDER.remove();
}
}

@SuppressWarnings("serial")
final class RetryContext extends AttributeAccessorSupport {

private int retryCount;

public int getRetryCount() {
return this.retryCount;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;

Expand All @@ -29,6 +28,7 @@
import org.jspecify.annotations.Nullable;

import org.springframework.core.AttributeAccessor;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.Pausable;
Expand Down Expand Up @@ -56,8 +56,6 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -86,7 +84,7 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport

private @Nullable RetryTemplate retryTemplate;

private org.springframework.retry.@Nullable RecoveryCallback<?> recoveryCallback;
private @Nullable RecoveryCallback<?> recoveryCallback;

private @Nullable BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

Expand Down Expand Up @@ -174,8 +172,7 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
* @param recoveryCallback the recovery callback.
*/
public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
this.recoveryCallback = (context) ->
recoveryCallback.recover(context, Objects.requireNonNull(context.getLastThrowable()));
this.recoveryCallback = recoveryCallback;
}

/**
Expand Down Expand Up @@ -209,12 +206,7 @@ protected void onInit() {
if (this.retryTemplate != null) {
MessageChannel errorChannel = getErrorChannel();
if (this.recoveryCallback != null && errorChannel != null) {
// TODO https://github.com/spring-projects/spring-integration/issues/10345
ErrorMessageSendingRecoverer errorMessageSendingRecoverer =
new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy());
this.recoveryCallback =
context ->
errorMessageSendingRecoverer.recover(context, context.getLastThrowable());
this.recoveryCallback = new ErrorMessageSendingRecoverer(errorChannel, getErrorMessageStrategy());
}
}
ContainerProperties containerProperties = this.messageListenerContainer.getContainerProperties();
Expand Down Expand Up @@ -266,27 +258,24 @@ public int afterShutdown() {
}

/**
* If there's a retry template, it will set the attributes holder via the listener. If
* there's no retry template, but there's an error channel, we create a new attributes
* holder here. If an attributes holder exists (by either method), we set the
* If there's a retry template, it will set the attribute holder via the listener. If
* there's no retry template, but there's an error channel, we create a new attribute
* holder here. If an attribute holder exists (by either method), we set the
* attributes for use by the {@link org.springframework.integration.support.ErrorMessageStrategy}.
* @param record the record.
* @param message the message.
* @param conversionError a conversion error occurred.
*/
private void setAttributesIfNecessary(Object record, @Nullable Message<?> message, boolean conversionError) {
boolean needHolder = ATTRIBUTES_HOLDER.get() == null
&& (getErrorChannel() != null && (this.retryTemplate == null || conversionError));
boolean needAttributes = needHolder || this.retryTemplate != null;
boolean needHolder = getErrorChannel() != null || this.retryTemplate != null || conversionError;
if (needHolder) {
ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
}
if (needAttributes) {
AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
if (attributes != null) {
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
attributes.setAttribute(KafkaHeaders.RAW_DATA, record);
if (attributes == null) {
attributes = ErrorMessageUtils.getAttributeAccessor(null, null);
ATTRIBUTES_HOLDER.set(attributes);
}
attributes.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
attributes.setAttribute(KafkaHeaders.RAW_DATA, record);
}
}

Expand All @@ -304,7 +293,7 @@ protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> messa
private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> {

IntegrationRecordMessageListener() {
super(null, null); // NOSONAR - out of use
super(null, null);
}

@Override
Expand Down Expand Up @@ -338,7 +327,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
sendAndReceive(record, message, acknowledgment, consumer);
}
else {
KafkaInboundGateway.this.logger.debug(() -> "Converter returned a null message for: " + record);
KafkaInboundGateway.this.logger.warn(() -> "Converter returned a null message for: " + record);
}
}

Expand All @@ -364,23 +353,22 @@ private void doSendAndReceive(Message<?> message) {
KafkaInboundGateway.this.kafkaTemplate.send(reply);
}
else {
this.logger.debug(() -> "No reply received for " + message);
this.logger.warn(() -> "No reply received for " + message);
}
}
finally {
if (KafkaInboundGateway.this.retryTemplate == null) {
ATTRIBUTES_HOLDER.remove();
}
ATTRIBUTES_HOLDER.remove();
}
}

private Message<?> enhanceHeadersAndSaveAttributes(Message<?> message, ConsumerRecord<K, V> record) {
Message<?> messageToReturn = message;
AttributeAccessor retryContext = ATTRIBUTES_HOLDER.get();
if (message.getHeaders() instanceof KafkaMessageHeaders) {
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
if (KafkaInboundGateway.this.retryTemplate != null) {
if (KafkaInboundGateway.this.retryTemplate != null && retryContext != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
new AtomicInteger(((RetryContext) retryContext).getRetryCount() + 1);
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
Expand All @@ -394,9 +382,9 @@ else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
}
else {
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
if (KafkaInboundGateway.this.retryTemplate != null) {
if (KafkaInboundGateway.this.retryTemplate != null && retryContext != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) ATTRIBUTES_HOLDER.get()).getRetryCount() + 1);
new AtomicInteger(((RetryContext) retryContext).getRetryCount() + 1);
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
else if (KafkaInboundGateway.this.containerDeliveryAttemptPresent) {
Expand Down
Loading