Skip to content

Commit 876763a

Browse files
committed
add test code
. Signed-off-by: moonyougnCHAE <[email protected]>
1 parent be2e5b7 commit 876763a

File tree

1 file changed

+65
-0
lines changed

1 file changed

+65
-0
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.common.header.Header;
4848
import org.apache.kafka.common.header.internals.RecordHeader;
4949
import org.apache.kafka.common.header.internals.RecordHeaders;
50+
import org.jspecify.annotations.Nullable;
5051
import org.junit.jupiter.api.BeforeAll;
5152
import org.junit.jupiter.api.Test;
5253
import org.mockito.ArgumentCaptor;
@@ -75,6 +76,7 @@
7576
import org.springframework.kafka.transaction.KafkaTransactionManager;
7677
import org.springframework.messaging.MessageHeaders;
7778
import org.springframework.transaction.TransactionDefinition;
79+
import org.springframework.transaction.annotation.Transactional;
7880
import org.springframework.transaction.support.DefaultTransactionDefinition;
7981
import org.springframework.util.backoff.FixedBackOff;
8082

@@ -140,6 +142,8 @@ public class TransactionalContainerTests {
140142

141143
public static final String topic10 = "txTopic10";
142144

145+
public static final String topic11 = "txTopic11";
146+
143147
private static EmbeddedKafkaBroker embeddedKafka;
144148

145149
@BeforeAll
@@ -1148,4 +1152,65 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
11481152
container.stop();
11491153
}
11501154

1155+
@Test
1156+
void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException {
1157+
// init producer
1158+
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
1159+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
1160+
pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener");
1161+
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
1162+
1163+
// init consumer
1164+
String group = "testSendOffsetOnlyOnActiveTransaction";
1165+
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false);
1166+
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
1167+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
1168+
ContainerProperties containerProps = new ContainerProperties(topic11);
1169+
containerProps.setPollTimeout(10_000);
1170+
final var successLatch = new AtomicReference<>(new CountDownLatch(2));
1171+
containerProps.setMessageListener(new MessageListener<Integer, String>() {
1172+
@Transactional(value = "testSendOffsetOnlyOnActiveTransaction")
1173+
@Override
1174+
public void onMessage(ConsumerRecord<Integer, String> data) {
1175+
}
1176+
});
1177+
1178+
// init container
1179+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
1180+
containerProps.setKafkaAwareTransactionManager(tm);
1181+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
1182+
container.setBeanName("testSendOffsetOnlyOnActiveTransaction");
1183+
container.setRecordInterceptor(new RecordInterceptor<Integer, String>() {
1184+
boolean isFirst = true;
1185+
1186+
@Override
1187+
public @Nullable ConsumerRecord<Integer, String> intercept(
1188+
ConsumerRecord<Integer, String> record,
1189+
Consumer<Integer, String> consumer) {
1190+
if (isFirst) {
1191+
isFirst = false;
1192+
return record;
1193+
}
1194+
return null;
1195+
}
1196+
1197+
@Override
1198+
public void afterRecord(
1199+
ConsumerRecord<Integer, String> record,
1200+
Consumer<Integer, String> consumer) {
1201+
successLatch.get().countDown();
1202+
}
1203+
});
1204+
container.start();
1205+
1206+
template.executeInTransaction(t -> {
1207+
template.send(new ProducerRecord<>(topic11, 0, 0, "bar1"));
1208+
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2"));
1209+
return null;
1210+
});
1211+
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
1212+
1213+
container.stop();
1214+
pf.destroy();
1215+
}
11511216
}

0 commit comments

Comments
 (0)