Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -667,23 +667,25 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Duration syncCommitTimeout;

private final RecordInterceptor<K, V> recordInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
? getRecordInterceptor()
: null;
private final RecordInterceptor<K, V> recordInterceptor =
!isInterceptBeforeTx() && this.transactionManager != null
? getRecordInterceptor()
: null;

private final RecordInterceptor<K, V> earlyRecordInterceptor =
isInterceptBeforeTx() || this.kafkaTxManager == null
isInterceptBeforeTx() || this.transactionManager == null
? getRecordInterceptor()
: null;

private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();

private final BatchInterceptor<K, V> batchInterceptor = !isInterceptBeforeTx() && this.kafkaTxManager != null
? getBatchInterceptor()
: null;
private final BatchInterceptor<K, V> batchInterceptor =
!isInterceptBeforeTx() && this.transactionManager != null
? getBatchInterceptor()
: null;

private final BatchInterceptor<K, V> earlyBatchInterceptor =
isInterceptBeforeTx() || this.kafkaTxManager == null
isInterceptBeforeTx() || this.transactionManager == null
? getBatchInterceptor()
: null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;

/**
Expand Down Expand Up @@ -655,6 +656,104 @@ public void failure(ConsumerRecords records, Exception exception, Consumer consu
}
}

@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
void testInterceptInTxNonKafkaTM() throws InterruptedException {
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
final Consumer consumer = mock(Consumer.class);
TopicPartition tp0 = new TopicPartition("foo", 0);
ConsumerRecord record1 = new ConsumerRecord("foo", 0, 0L, "bar", "baz");
ConsumerRecords records = new ConsumerRecords(
Collections.singletonMap(tp0, Collections.singletonList(record1)));
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
AtomicInteger firstOrSecondPoll = new AtomicInteger();
willAnswer(invocation -> {
Thread.sleep(10);
return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty;
}).given(consumer).poll(any());
List<TopicPartition> assignments = Arrays.asList(tp0);
willAnswer(invocation -> {
((ConsumerRebalanceListener) invocation.getArgument(1))
.onPartitionsAssigned(assignments);
return null;
}).given(consumer).subscribe(any(Collection.class), any());
given(consumer.position(any())).willReturn(0L);
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
.willReturn(consumer);
ContainerProperties containerProperties = new ContainerProperties("foo");
containerProperties.setGroupId("grp");
AtomicReference<List<ConsumerRecord<String, String>>> received = new AtomicReference<>();
containerProperties.setMessageListener((MessageListener) rec -> {
});
containerProperties.setMissingTopicsFatal(false);
List<String> order = new ArrayList<>();
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(2));
PlatformTransactionManager tm = mock(PlatformTransactionManager.class);
willAnswer(inv -> {
order.add("tx");
latch.get().countDown();
return null;
}).given(tm).getTransaction(any());
containerProperties.setTransactionManager(tm);
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(consumerFactory,
containerProperties);
AtomicReference<CountDownLatch> successCalled = new AtomicReference<>(new CountDownLatch(1));
container.setRecordInterceptor(new RecordInterceptor() {

@Override
@Nullable
public ConsumerRecord intercept(ConsumerRecord rec, Consumer consumer) {
order.add("interceptor");
latch.get().countDown();
return rec;
}

@Override
public void success(ConsumerRecord record, Consumer consumer) {
order.add("success");
successCalled.get().countDown();
}

});
container.setBatchInterceptor(new BatchInterceptor() {

@Override
@Nullable
public ConsumerRecords intercept(ConsumerRecords recs, Consumer consumer) {
order.add("b.interceptor");
latch.get().countDown();
return new ConsumerRecords(Collections.singletonMap(tp0, Collections.singletonList(record1)));
}

@Override
public void success(ConsumerRecords records, Consumer consumer) {
order.add("b.success");
successCalled.get().countDown();
}

});
container.setInterceptBeforeTx(false);
container.start();
try {
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
assertThat(order).containsExactly("tx", "interceptor", "success");
container.stop();
latch.set(new CountDownLatch(2));
successCalled.set(new CountDownLatch(1));
container.getContainerProperties().setMessageListener((BatchMessageListener) recs -> {
});
firstOrSecondPoll.set(0);
container.start();
assertThat(latch.get().await(10, TimeUnit.SECONDS)).isTrue();
assertThat(successCalled.get().await(10, TimeUnit.SECONDS)).isTrue();
assertThat(order).containsExactly("tx", "interceptor", "success", "tx", "b.interceptor", "b.success");
}
finally {
container.stop();
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void testNoCommitOnAssignmentWithEarliest() throws InterruptedException {
Expand Down