Skip to content

Commit c55b5e8

Browse files
moonyoungCHAEspring-builds
authored andcommitted
Fix send offset to tx, when no active tx (#4096)
Fixes #4088 Fix `IllegalStateException` when record filtered by interceptor in transactional listener When using a transactional KafkaMessageListenerContainer with an early `RecordInterceptor` that filters out records, the container attempted to send offsets to a transaction even when no active transaction existed. This resulted in an `IllegalStateException` from the Kafka producer: "Cannot send offsets if a transaction is not in progress (currentState=READY)" The issue occurred because: - Container has a `KafkaTransactionManager` configured - Interceptor filters a record before listener execution - No transaction is started (listener never executes) - Container still attempts to send offsets via `sendOffsetsToTransaction()` - Kafka producer rejects the call as no transaction is active Solution: Add a guard clause in `sendOffsetsToTransaction()` to check for an active transaction resource in `TransactionSynchronizationManager`. If either `kafkaTxManager` is null or there's no active transaction resource, the method returns early without attempting to send offsets. The fix includes an integration test in `TransactionalContainerTests` that: - Uses `RecordInterceptor` to filter records - Tracks transaction commits via `TransactionExecutionListener` - Verifies only records that execute listeners trigger transactions - Confirms filtered records don't cause exceptions Signed-off-by: moonyougnCHAE <[email protected]> (cherry picked from commit 81f2941)
1 parent beb37d2 commit c55b5e8

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3067,6 +3067,9 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) {
30673067
}
30683068

30693069
private void sendOffsetsToTransaction() {
3070+
if (this.kafkaTxManager == null || TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) {
3071+
return;
3072+
}
30703073
handleAcks();
30713074
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
30723075
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.common.header.Header;
4848
import org.apache.kafka.common.header.internals.RecordHeader;
4949
import org.apache.kafka.common.header.internals.RecordHeaders;
50+
import org.jspecify.annotations.Nullable;
5051
import org.junit.jupiter.api.BeforeAll;
5152
import org.junit.jupiter.api.Test;
5253
import org.mockito.ArgumentCaptor;
@@ -75,6 +76,9 @@
7576
import org.springframework.kafka.transaction.KafkaTransactionManager;
7677
import org.springframework.messaging.MessageHeaders;
7778
import org.springframework.transaction.TransactionDefinition;
79+
import org.springframework.transaction.TransactionExecution;
80+
import org.springframework.transaction.TransactionExecutionListener;
81+
import org.springframework.transaction.annotation.Transactional;
7882
import org.springframework.transaction.support.DefaultTransactionDefinition;
7983
import org.springframework.util.backoff.FixedBackOff;
8084

@@ -140,6 +144,8 @@ public class TransactionalContainerTests {
140144

141145
public static final String topic10 = "txTopic10";
142146

147+
public static final String topic11 = "txTopic11";
148+
143149
private static EmbeddedKafkaBroker embeddedKafka;
144150

145151
@BeforeAll
@@ -1148,4 +1154,82 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
11481154
container.stop();
11491155
}
11501156

1157+
@Test
1158+
void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException {
1159+
// init producer
1160+
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
1161+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
1162+
pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener");
1163+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
1164+
1165+
// init consumer
1166+
String group = "testSendOffsetOnlyOnActiveTransaction";
1167+
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false);
1168+
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1169+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
1170+
ContainerProperties containerProps = new ContainerProperties(topic11);
1171+
containerProps.setPollTimeout(10_000);
1172+
containerProps.setMessageListener(new MessageListener<Integer, String>() {
1173+
@Transactional("testSendOffsetOnlyOnActiveTransaction")
1174+
@Override
1175+
public void onMessage(ConsumerRecord<Integer, String> data) {
1176+
}
1177+
});
1178+
1179+
// init container
1180+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
1181+
AtomicInteger txCount = new AtomicInteger(0);
1182+
tm.addListener(new TransactionExecutionListener() {
1183+
@Override
1184+
public void afterCommit(TransactionExecution transaction, @Nullable Throwable commitFailure) {
1185+
txCount.incrementAndGet();
1186+
TransactionExecutionListener.super.afterCommit(transaction, commitFailure);
1187+
}
1188+
});
1189+
containerProps.setKafkaAwareTransactionManager(tm);
1190+
1191+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1192+
container.setBeanName("testSendOffsetOnlyOnActiveTransaction");
1193+
final var interceptorLatch = new AtomicReference<>(new CountDownLatch(1));
1194+
container.setRecordInterceptor(new RecordInterceptor<Integer, String>() {
1195+
boolean isFirst = true;
1196+
1197+
@Override
1198+
public @Nullable ConsumerRecord<Integer, String> intercept(
1199+
ConsumerRecord<Integer, String> record,
1200+
Consumer<Integer, String> consumer) {
1201+
if (isFirst) {
1202+
isFirst = false;
1203+
return record;
1204+
}
1205+
return null;
1206+
}
1207+
1208+
@Override
1209+
public void afterRecord(
1210+
ConsumerRecord<Integer, String> record,
1211+
Consumer<Integer, String> consumer) {
1212+
interceptorLatch.get().countDown();
1213+
}
1214+
});
1215+
container.start();
1216+
1217+
template.executeInTransaction(t -> {
1218+
template.send(new ProducerRecord<>(topic11, 0, 0, "bar1"));
1219+
return null;
1220+
});
1221+
assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
1222+
assertThat(txCount.get()).isEqualTo(1);
1223+
1224+
interceptorLatch.set(new CountDownLatch(1));
1225+
template.executeInTransaction(t -> {
1226+
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2"));
1227+
return null;
1228+
});
1229+
assertThat(interceptorLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
1230+
assertThat(txCount.get()).isEqualTo(1);
1231+
1232+
container.stop();
1233+
pf.destroy();
1234+
}
11511235
}

0 commit comments

Comments
 (0)