|  | 
| 47 | 47 | import org.apache.kafka.common.header.Header; | 
| 48 | 48 | import org.apache.kafka.common.header.internals.RecordHeader; | 
| 49 | 49 | import org.apache.kafka.common.header.internals.RecordHeaders; | 
|  | 50 | +import org.jspecify.annotations.Nullable; | 
| 50 | 51 | import org.junit.jupiter.api.BeforeAll; | 
| 51 | 52 | import org.junit.jupiter.api.Test; | 
| 52 | 53 | import org.mockito.ArgumentCaptor; | 
|  | 
| 75 | 76 | import org.springframework.kafka.transaction.KafkaTransactionManager; | 
| 76 | 77 | import org.springframework.messaging.MessageHeaders; | 
| 77 | 78 | import org.springframework.transaction.TransactionDefinition; | 
|  | 79 | +import org.springframework.transaction.annotation.Transactional; | 
| 78 | 80 | import org.springframework.transaction.support.DefaultTransactionDefinition; | 
| 79 | 81 | import org.springframework.util.backoff.FixedBackOff; | 
| 80 | 82 | 
 | 
| @@ -140,6 +142,8 @@ public class TransactionalContainerTests { | 
| 140 | 142 | 
 | 
| 141 | 143 | 	public static final String topic10 = "txTopic10"; | 
| 142 | 144 | 
 | 
|  | 145 | +	public static final String topic11 = "txTopic11"; | 
|  | 146 | + | 
| 143 | 147 | 	private static EmbeddedKafkaBroker embeddedKafka; | 
| 144 | 148 | 
 | 
| 145 | 149 | 	@BeforeAll | 
| @@ -1148,4 +1152,65 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) { | 
| 1148 | 1152 | 		container.stop(); | 
| 1149 | 1153 | 	} | 
| 1150 | 1154 | 
 | 
|  | 1155 | +	@Test | 
|  | 1156 | +	void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException { | 
|  | 1157 | +		// init producer | 
|  | 1158 | +		Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka); | 
|  | 1159 | +		DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties); | 
|  | 1160 | +		pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener"); | 
|  | 1161 | +		final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf); | 
|  | 1162 | + | 
|  | 1163 | +		// init consumer | 
|  | 1164 | +		String group = "testSendOffsetOnlyOnActiveTransaction"; | 
|  | 1165 | +		Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false); | 
|  | 1166 | +		consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); | 
|  | 1167 | +		DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties); | 
|  | 1168 | +		ContainerProperties containerProps = new ContainerProperties(topic11); | 
|  | 1169 | +		containerProps.setPollTimeout(10_000); | 
|  | 1170 | +		final var successLatch = new AtomicReference<>(new CountDownLatch(2)); | 
|  | 1171 | +		containerProps.setMessageListener(new MessageListener<Integer, String>() { | 
|  | 1172 | +			@Transactional("testSendOffsetOnlyOnActiveTransaction") | 
|  | 1173 | +			@Override | 
|  | 1174 | +			public void onMessage(ConsumerRecord<Integer, String> data) { | 
|  | 1175 | +			} | 
|  | 1176 | +		}); | 
|  | 1177 | + | 
|  | 1178 | +		// init container | 
|  | 1179 | +		KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf); | 
|  | 1180 | +		containerProps.setKafkaAwareTransactionManager(tm); | 
|  | 1181 | +		KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); | 
|  | 1182 | +		container.setBeanName("testSendOffsetOnlyOnActiveTransaction"); | 
|  | 1183 | +		container.setRecordInterceptor(new RecordInterceptor<Integer, String>() { | 
|  | 1184 | +			boolean isFirst = true; | 
|  | 1185 | + | 
|  | 1186 | +			@Override | 
|  | 1187 | +			public @Nullable ConsumerRecord<Integer, String> intercept( | 
|  | 1188 | +					ConsumerRecord<Integer, String> record, | 
|  | 1189 | +					Consumer<Integer, String> consumer) { | 
|  | 1190 | +				if (isFirst) { | 
|  | 1191 | +					isFirst = false; | 
|  | 1192 | +					return record; | 
|  | 1193 | +				} | 
|  | 1194 | +				return null; | 
|  | 1195 | +			} | 
|  | 1196 | + | 
|  | 1197 | +			@Override | 
|  | 1198 | +			public void afterRecord( | 
|  | 1199 | +					ConsumerRecord<Integer, String> record, | 
|  | 1200 | +					Consumer<Integer, String> consumer) { | 
|  | 1201 | +				successLatch.get().countDown(); | 
|  | 1202 | +			} | 
|  | 1203 | +		}); | 
|  | 1204 | +		container.start(); | 
|  | 1205 | + | 
|  | 1206 | +		template.executeInTransaction(t -> { | 
|  | 1207 | +			template.send(new ProducerRecord<>(topic11, 0, 0, "bar1")); | 
|  | 1208 | +			template.send(new ProducerRecord<>(topic11, 0, 0, "bar2")); | 
|  | 1209 | +			return null; | 
|  | 1210 | +		}); | 
|  | 1211 | +		assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue(); | 
|  | 1212 | + | 
|  | 1213 | +		container.stop(); | 
|  | 1214 | +		pf.destroy(); | 
|  | 1215 | +	} | 
| 1151 | 1216 | } | 
0 commit comments