-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
I need to have a simple application doing the following -
- consuming from kafka topic
2.1doSomeWork()
which involves writing to a different output topic
2.2 In case ofRuntimeException
write the message to another failure topic
This is my code and config
@Test
public void simpleTransactionTest1() throws InterruptedException {
String testCaseId = "simpleTransactionTest";
final CountDownLatch latch = new CountDownLatch(1);
/**
* producer properties
*/
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
/**
* send to test topic
*/
Producer<String, String> producer = new KafkaProducer<>(producerProps);
ProducerRecord<String, String> pr = new ProducerRecord<String, String>(
"test", "simpleTransactionTest");
producer.send(pr);
producer.close();
/**
* producer factory
*/
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
producerProps);
pf.setTransactionIdPrefix("my.transaction");
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
/**
* consumer properties
*/
Map<String, Object> consumerProps = kafkaConfig.consumerProps();
consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
/**
* consumer factory
*/
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
ContainerProperties cp = new ContainerProperties("test");
KafkaTransactionManager<String, String> tm = new KafkaTransactionManager<>(pf);
cp.setTransactionManager(tm);
cp.setAckMode(AckMode.RECORD);
cp.setAckOnError(true);
cp.setMessageListener((MessageListener<String, String>) m -> {
kafkaTemplate.send("output1", "output1 " + m.value());
kafkaTemplate.send("output2", "output2 " + m.value());
throw new RuntimeException("TestException");
});
cp.setErrorHandler((ErrorHandler) (e, data) -> {
kafkaTemplate.send("test.bo", "bo " + data.value());
latch.countDown();
});
/**
* kafka message listener container
*/
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(
cf, cp);
container.setBeanName("simpleTransaction");
container.start();
// some sleep
Thread.sleep(2000);
// check whether latch is zero
Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();
Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singletonList("test"));
ConsumerRecords<String, String> records = consumer.poll(1000);
consumer.commitSync();
consumer.close();
Assert.assertTrue(records.count() == 0);
}
In case of RuntimeException
in the message listener, the error handler that I have set is getting invoked but on successful exit from the error handler, the offset is not getting committed in the input topic.
The send that I am doing inside message listener and error handler are getting committed successfully.
The same test runs when the I remove the throw new RuntimeException("TestException");
from message listener