|
47 | 47 | import org.apache.kafka.common.header.Header; |
48 | 48 | import org.apache.kafka.common.header.internals.RecordHeader; |
49 | 49 | import org.apache.kafka.common.header.internals.RecordHeaders; |
| 50 | +import org.jspecify.annotations.Nullable; |
50 | 51 | import org.junit.jupiter.api.BeforeAll; |
51 | 52 | import org.junit.jupiter.api.Test; |
52 | 53 | import org.mockito.ArgumentCaptor; |
|
75 | 76 | import org.springframework.kafka.transaction.KafkaTransactionManager; |
76 | 77 | import org.springframework.messaging.MessageHeaders; |
77 | 78 | import org.springframework.transaction.TransactionDefinition; |
| 79 | +import org.springframework.transaction.annotation.Transactional; |
78 | 80 | import org.springframework.transaction.support.DefaultTransactionDefinition; |
79 | 81 | import org.springframework.util.backoff.FixedBackOff; |
80 | 82 |
|
@@ -140,6 +142,8 @@ public class TransactionalContainerTests { |
140 | 142 |
|
141 | 143 | public static final String topic10 = "txTopic10"; |
142 | 144 |
|
| 145 | + public static final String topic11 = "txTopic11"; |
| 146 | + |
143 | 147 | private static EmbeddedKafkaBroker embeddedKafka; |
144 | 148 |
|
145 | 149 | @BeforeAll |
@@ -1148,4 +1152,65 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) { |
1148 | 1152 | container.stop(); |
1149 | 1153 | } |
1150 | 1154 |
|
| 1155 | + @Test |
| 1156 | + void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException { |
| 1157 | + // init producer |
| 1158 | + Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka); |
| 1159 | + DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties); |
| 1160 | + pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener"); |
| 1161 | + final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf); |
| 1162 | + |
| 1163 | + // init consumer |
| 1164 | + String group = "testSendOffsetOnlyOnActiveTransaction"; |
| 1165 | + Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false); |
| 1166 | + consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); |
| 1167 | + DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties); |
| 1168 | + ContainerProperties containerProps = new ContainerProperties(topic11); |
| 1169 | + containerProps.setPollTimeout(10_000); |
| 1170 | + final var successLatch = new AtomicReference<>(new CountDownLatch(2)); |
| 1171 | + containerProps.setMessageListener(new MessageListener<Integer, String>() { |
| 1172 | + @Transactional("testSendOffsetOnlyOnActiveTransaction") |
| 1173 | + @Override |
| 1174 | + public void onMessage(ConsumerRecord<Integer, String> data) { |
| 1175 | + } |
| 1176 | + }); |
| 1177 | + |
| 1178 | + // init container |
| 1179 | + KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf); |
| 1180 | + containerProps.setKafkaAwareTransactionManager(tm); |
| 1181 | + KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); |
| 1182 | + container.setBeanName("testSendOffsetOnlyOnActiveTransaction"); |
| 1183 | + container.setRecordInterceptor(new RecordInterceptor<Integer, String>() { |
| 1184 | + boolean isFirst = true; |
| 1185 | + |
| 1186 | + @Override |
| 1187 | + public @Nullable ConsumerRecord<Integer, String> intercept( |
| 1188 | + ConsumerRecord<Integer, String> record, |
| 1189 | + Consumer<Integer, String> consumer) { |
| 1190 | + if (isFirst) { |
| 1191 | + isFirst = false; |
| 1192 | + return record; |
| 1193 | + } |
| 1194 | + return null; |
| 1195 | + } |
| 1196 | + |
| 1197 | + @Override |
| 1198 | + public void afterRecord( |
| 1199 | + ConsumerRecord<Integer, String> record, |
| 1200 | + Consumer<Integer, String> consumer) { |
| 1201 | + successLatch.get().countDown(); |
| 1202 | + } |
| 1203 | + }); |
| 1204 | + container.start(); |
| 1205 | + |
| 1206 | + template.executeInTransaction(t -> { |
| 1207 | + template.send(new ProducerRecord<>(topic11, 0, 0, "bar1")); |
| 1208 | + template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); |
| 1209 | + return null; |
| 1210 | + }); |
| 1211 | + assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); |
| 1212 | + |
| 1213 | + container.stop(); |
| 1214 | + pf.destroy(); |
| 1215 | + } |
1151 | 1216 | } |
0 commit comments