-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Description
I am really sorry that I am not good at english
I have a question about processCommits method in ListenerConsumer
In the processCommits() method in the ListenerConsumer, this.acks.size () is added to this.count.
however after handleAcks() method is called, this.acks is always empty. As a result, always 0 is added to this.count. I think before call the handleAcks method, acks.size() is added to this.count.
below is the code processCommits method and handleAcks method in ListenerConsumer.
private void processCommits() {
handleAcks();
this.count += this.acks.size();
long now;
AckMode ackMode = this.containerProperties.getAckMode();
// skip
}
private void handleAcks() {
ConsumerRecord<K, V> record = this.acks.poll();
while (record != null) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Ack: " + record);
}
processAck(record);
record = this.acks.poll();
}
}
I am very sorry that the explanation is very poor.