Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3080,6 +3080,9 @@ else if (!this.autoCommit && (!this.isAnyManualAck || commitRecovered)) {
}

private void sendOffsetsToTransaction() {
if (this.kafkaTxManager != null && TransactionSynchronizationManager.getResource(this.kafkaTxManager.getProducerFactory()) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the kafkaTxManager is null? Don't we need to address that also? If it is null, we can return as there is no point in sending the offsets to transaction. Do I miss anything on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko You're right. I fixed it. Thanks for the feedback.

return;
}
handleAcks();
Map<TopicPartition, OffsetAndMetadata> commits = buildCommits();
this.commitLogger.log(() -> "Sending offsets to transaction: " + commits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageHeaders;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.backoff.FixedBackOff;

Expand Down Expand Up @@ -140,6 +142,8 @@ public class TransactionalContainerTests {

public static final String topic10 = "txTopic10";

public static final String topic11 = "txTopic11";

private static EmbeddedKafkaBroker embeddedKafka;

@BeforeAll
Expand Down Expand Up @@ -1148,4 +1152,65 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
container.stop();
}

@Test
void testSendOffsetOnlyOnActiveTransaction() throws InterruptedException {
// init producer
Map<String, Object> producerProperties = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<>(producerProperties);
pf.setTransactionIdPrefix("testSendOffsetOnlyOnActiveTransaction.recordListener");
final KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);

// init consumer
String group = "testSendOffsetOnlyOnActiveTransaction";
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(embeddedKafka, group, false);
consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProps = new ContainerProperties(topic11);
containerProps.setPollTimeout(10_000);
final var successLatch = new AtomicReference<>(new CountDownLatch(2));
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Transactional("testSendOffsetOnlyOnActiveTransaction")
@Override
public void onMessage(ConsumerRecord<Integer, String> data) {
}
});

// init container
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
containerProps.setKafkaAwareTransactionManager(tm);
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
container.setBeanName("testSendOffsetOnlyOnActiveTransaction");
container.setRecordInterceptor(new RecordInterceptor<Integer, String>() {
boolean isFirst = true;

@Override
public @Nullable ConsumerRecord<Integer, String> intercept(
ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {
if (isFirst) {
isFirst = false;
return record;
}
return null;
}

@Override
public void afterRecord(
ConsumerRecord<Integer, String> record,
Consumer<Integer, String> consumer) {
successLatch.get().countDown();
}
});
container.start();

template.executeInTransaction(t -> {
template.send(new ProducerRecord<>(topic11, 0, 0, "bar1"));
template.send(new ProducerRecord<>(topic11, 0, 0, "bar2"));
return null;
});
assertThat(successLatch.get().await(30, TimeUnit.SECONDS)).isTrue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this test ensuring that offsets are sent to only active transactions? I mean, i see that you are asserting that two records are consumed transactionally, but how do we ensure the case where there is no active txn? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sobychacko Thanks for the feedback. I modified the test to verify whether the commit occurred within a transaction by using TransactionExecutionListener.


container.stop();
pf.destroy();
}
}