Skip to content

Transaction not getting committed when error handler is invoked #683

@abhinabasarkar

Description

@abhinabasarkar

I need to have a simple application doing the following -

  1. consuming from kafka topic
    2.1 doSomeWork() which involves writing to a different output topic
    2.2 In case of RuntimeException write the message to another failure topic

This is my code and config

@Test
    public void simpleTransactionTest1() throws InterruptedException {
        String testCaseId = "simpleTransactionTest";
        final CountDownLatch latch = new CountDownLatch(1);
        
        /**
         * producer properties
         */
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        /**
         * send to test topic
         */
        Producer<String, String> producer = new KafkaProducer<>(producerProps);
        ProducerRecord<String, String> pr = new ProducerRecord<String, String>(
                "test", "simpleTransactionTest");
        producer.send(pr);
        producer.close();

        /**
         * producer factory
         */
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(
                producerProps);
        pf.setTransactionIdPrefix("my.transaction");
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
        
        /**
         * consumer properties
         */
        Map<String, Object> consumerProps = kafkaConfig.consumerProps();
        consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
                "read_committed");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        
        /**
         * consumer factory
         */
        DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
                consumerProps);
        ContainerProperties cp = new ContainerProperties("test");
        KafkaTransactionManager<String, String> tm = new KafkaTransactionManager<>(pf);
        cp.setTransactionManager(tm);
        cp.setAckMode(AckMode.RECORD);
        cp.setAckOnError(true);
        cp.setMessageListener((MessageListener<String, String>) m -> {
            kafkaTemplate.send("output1", "output1 " + m.value());
            kafkaTemplate.send("output2", "output2 " + m.value());
            throw new RuntimeException("TestException");
            
        });
        cp.setErrorHandler((ErrorHandler) (e, data) -> {
            kafkaTemplate.send("test.bo", "bo " + data.value());
            latch.countDown();
        });
        
        /**
         * kafka message listener container
         */
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(
                cf, cp);
        container.setBeanName("simpleTransaction");
        container.start();
        
        // some sleep
        Thread.sleep(2000);
        
        // check whether latch is zero
        Assert.assertTrue(latch.await(60, TimeUnit.SECONDS));
        
        container.stop();
        
        Consumer<String, String> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singletonList("test"));
        ConsumerRecords<String, String> records = consumer.poll(1000);
        consumer.commitSync();
        consumer.close();
        Assert.assertTrue(records.count() == 0);
        
    }

In case of RuntimeException in the message listener, the error handler that I have set is getting invoked but on successful exit from the error handler, the offset is not getting committed in the input topic.

The send that I am doing inside message listener and error handler are getting committed successfully.

The same test runs when the I remove the throw new RuntimeException("TestException"); from message listener

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions