From 5b0ed9e7a177b22e53af70017672a7bda762a88d Mon Sep 17 00:00:00 2001 From: moonyougnCHAE Date: Fri, 3 Oct 2025 22:58:39 +0900 Subject: [PATCH 1/4] Fix send to offset to tx, when no active tx . Signed-off-by: moonyougnCHAE --- .../kafka/listener/KafkaMessageListenerContainer.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 6364135b12..7bd7c00824 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3080,6 +3080,9 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) { } private void sendOffsetsToTransaction() { + if (this.kafkaTxManager != null && TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) { + return; + } handleAcks(); Map commits = buildCommits(); this.commitLogger.log(() -> "Sending offsets to transaction: " + commits); From 41f7f31dc258fcf7547b7daa78d01525dd07a424 Mon Sep 17 00:00:00 2001 From: moonyougnCHAE Date: Sun, 5 Oct 2025 14:58:56 +0900 Subject: [PATCH 2/4] add test code . Signed-off-by: moonyougnCHAE --- .../listener/TransactionalContainerTests.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 40eb235037..60691800d8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -75,6 +76,7 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.messaging.MessageHeaders; import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.backoff.FixedBackOff; @@ -140,6 +142,8 @@ public class TransactionalContainerTests { public static final String topic10 = "txTopic10"; + public static final String topic11 = "txTopic11"; + private static EmbeddedKafkaBroker embeddedKafka; @BeforeAll @@ -1148,4 +1152,65 @@ public void onMessage(List> data) { container.stop(); } + @Test + void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException { + // init producer + Map producerProperties = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(producerProperties); + pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener"); + final KafkaTemplate template = new KafkaTemplate<>(pf); + + // init consumer + String group = "testSendOffsetOnlyOnActiveTransaction"; + Map consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false); + consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProperties); + ContainerProperties containerProps = new ContainerProperties(topic11); + containerProps.setPollTimeout(10_000); + final var successLatch = new AtomicReference<>(new CountDownLatch(2)); + containerProps.setMessageListener(new MessageListener() { + @Transactional("testSendOffsetOnlyOnActiveTransaction") + @Override + public void onMessage(ConsumerRecord data) { + } + }); + + // init container + KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + containerProps.setKafkaAwareTransactionManager(tm); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); + container.setBeanName("testSendOffsetOnlyOnActiveTransaction"); + container.setRecordInterceptor(new RecordInterceptor() { + boolean isFirst = true; + + @Override + public @Nullable ConsumerRecord intercept( + ConsumerRecord record, + Consumer consumer) { + if (isFirst) { + isFirst = false; + return record; + } + return null; + } + + @Override + public void afterRecord( + ConsumerRecord record, + Consumer consumer) { + successLatch.get().countDown(); + } + }); + container.start(); + + template.executeInTransaction(t -> { + template.send(new ProducerRecord<>(topic11, 0, 0, "bar1")); + template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); + return null; + }); + assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + + container.stop(); + pf.destroy(); + } } From a108a33e6a9839279218cb97a20b0053e2069453 Mon Sep 17 00:00:00 2001 From: moonyougnCHAE Date: Fri, 10 Oct 2025 11:13:22 +0900 Subject: [PATCH 3/4] add kafka tx manager null check . Signed-off-by: moonyougnCHAE --- .../kafka/listener/KafkaMessageListenerContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 7bd7c00824..71b4c3f99c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3080,7 +3080,7 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) { } private void sendOffsetsToTransaction() { - if (this.kafkaTxManager != null && TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) { + if (this.kafkaTxManager == null || TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) { return; } handleAcks(); From 1b15a47b7c03d78ac2062545b4711fc17b407a4d Mon Sep 17 00:00:00 2001 From: moonyougnCHAE Date: Sun, 12 Oct 2025 15:30:48 +0900 Subject: [PATCH 4/4] refactor test code . Signed-off-by: moonyougnCHAE --- .../listener/TransactionalContainerTests.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java index 60691800d8..92f175b39b 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java @@ -76,6 +76,8 @@ import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.messaging.MessageHeaders; import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionExecution; +import org.springframework.transaction.TransactionExecutionListener; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.backoff.FixedBackOff; @@ -1167,7 +1169,6 @@ void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProperties); ContainerProperties containerProps = new ContainerProperties(topic11); containerProps.setPollTimeout(10_000); - final var successLatch = new AtomicReference<>(new CountDownLatch(2)); containerProps.setMessageListener(new MessageListener() { @Transactional("testSendOffsetOnlyOnActiveTransaction") @Override @@ -1177,9 +1178,19 @@ public void onMessage(ConsumerRecord data) { // init container KafkaTransactionManager tm = new KafkaTransactionManager<>(pf); + AtomicInteger txCount = new AtomicInteger(0); + tm.addListener(new TransactionExecutionListener() { + @Override + public void afterCommit(TransactionExecution transaction, @Nullable Throwable commitFailure) { + txCount.incrementAndGet(); + TransactionExecutionListener.super.afterCommit(transaction, commitFailure); + } + }); containerProps.setKafkaAwareTransactionManager(tm); + KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.setBeanName("testSendOffsetOnlyOnActiveTransaction"); + final var interceptorLatch = new AtomicReference<>(new CountDownLatch(1)); container.setRecordInterceptor(new RecordInterceptor() { boolean isFirst = true; @@ -1198,17 +1209,25 @@ public void onMessage(ConsumerRecord data) { public void afterRecord( ConsumerRecord record, Consumer consumer) { - successLatch.get().countDown(); + interceptorLatch.get().countDown(); } }); container.start(); template.executeInTransaction(t -> { template.send(new ProducerRecord<>(topic11, 0, 0, "bar1")); + return null; + }); + assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(txCount.get()).isEqualTo(1); + + interceptorLatch.set(new CountDownLatch(1)); + template.executeInTransaction(t -> { template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); return null; }); - assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(txCount.get()).isEqualTo(1); container.stop(); pf.destroy();