Skip to content

Commit c5f485d

Browse files
garyrussellartembilan
authored andcommitted
GH-867: Add ConsumerStoppingEvent
Resolves #867
1 parent cf9daf5 commit c5f485d

File tree

5 files changed

+125
-1
lines changed

5 files changed

+125
-1
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
import java.util.Collection;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.common.TopicPartition;
23+
24+
/**
25+
* An event published when a consumer is stopped. While it is best practice to use
26+
* stateless listeners, you can consume this event to clean up any thread-based resources
27+
* (remove ThreadLocals, destroy thread-scoped beans etc), as long as the context event
28+
* multicaster is not modified to use an async task executor.
29+
*
30+
* @author Gary Russell
31+
* @since 2.2
32+
*
33+
*/
34+
@SuppressWarnings("serial")
35+
public class ConsumerStoppingEvent extends KafkaEvent {
36+
37+
private final Consumer<?, ?> consumer;
38+
39+
private final Collection<TopicPartition> partitions;
40+
41+
/**
42+
* Construct an instance with the provided source, consumer and partitions.
43+
* @param source the container.
44+
* @param consumer the consumer.
45+
* @param partitions the partitions.
46+
*/
47+
public ConsumerStoppingEvent(Object source, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
48+
super(source);
49+
this.consumer = consumer;
50+
this.partitions = partitions;
51+
}
52+
53+
public Consumer<?, ?> getConsumer() {
54+
return this.consumer;
55+
}
56+
57+
public Collection<TopicPartition> getPartitions() {
58+
return this.partitions;
59+
}
60+
61+
@Override
62+
public String toString() {
63+
return "ConsumerStoppingEvent [consumer=" + this.consumer + ", partitions=" + this.partitions + "]";
64+
}
65+
66+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.springframework.kafka.event.ConsumerPausedEvent;
6767
import org.springframework.kafka.event.ConsumerResumedEvent;
6868
import org.springframework.kafka.event.ConsumerStoppedEvent;
69+
import org.springframework.kafka.event.ConsumerStoppingEvent;
6970
import org.springframework.kafka.event.ListenerContainerIdleEvent;
7071
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
7172
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
@@ -331,6 +332,17 @@ private void publishConsumerResumedEvent(Collection<TopicPartition> partitions)
331332
}
332333
}
333334

335+
private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
336+
try {
337+
if (getApplicationEventPublisher() != null) {
338+
getApplicationEventPublisher().publishEvent(
339+
new ConsumerStoppingEvent(this, consumer, getAssignedPartitions()));
340+
}
341+
}
342+
catch (Exception e) {
343+
this.logger.error("Failed to publish consumer stopping event", e);
344+
}
345+
}
334346
private void publishConsumerStoppedEvent() {
335347
if (getApplicationEventPublisher() != null) {
336348
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this));
@@ -721,6 +733,7 @@ public void run() {
721733

722734
public void wrapUp() {
723735
ProducerFactoryUtils.clearConsumerGroupId();
736+
publishConsumerStoppingEvent(this.consumer);
724737
if (!this.fatalError) {
725738
if (this.kafkaTxManager == null) {
726739
commitPendingAcks();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.atomic.AtomicBoolean;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.stream.Collectors;
4950

5051
import org.apache.commons.logging.Log;
5152
import org.apache.commons.logging.LogFactory;
@@ -78,6 +79,7 @@
7879
import org.springframework.kafka.event.ConsumerPausedEvent;
7980
import org.springframework.kafka.event.ConsumerResumedEvent;
8081
import org.springframework.kafka.event.ConsumerStoppedEvent;
82+
import org.springframework.kafka.event.ConsumerStoppingEvent;
8183
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
8284
import org.springframework.kafka.listener.ContainerProperties.AckMode;
8385
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -173,6 +175,16 @@ public void testDelegateType() throws Exception {
173175
KafkaMessageListenerContainer<Integer, String> container =
174176
new KafkaMessageListenerContainer<>(cf, containerProps);
175177
container.setBeanName("delegate");
178+
AtomicReference<List<TopicPartitionInitialOffset>> offsets = new AtomicReference<>();
179+
container.setApplicationEventPublisher(e -> {
180+
if (e instanceof ConsumerStoppingEvent) {
181+
ConsumerStoppingEvent event = (ConsumerStoppingEvent) e;
182+
offsets.set(event.getPartitions().stream()
183+
.map(p -> new TopicPartitionInitialOffset(p.topic(), p.partition(),
184+
event.getConsumer().position(p, Duration.ofMillis(10_000))))
185+
.collect(Collectors.toList()));
186+
}
187+
});
176188
container.start();
177189

178190
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
@@ -186,6 +198,16 @@ public void testDelegateType() throws Exception {
186198
// Stack traces are environment dependent - verified in eclipse
187199
// assertThat(trace.get()[1].getMethodName()).contains("invokeRecordListener");
188200
container.stop();
201+
List<TopicPartitionInitialOffset> list = offsets.get();
202+
assertThat(list).isNotNull();
203+
list.forEach(tpio -> {
204+
if (tpio.partition() == 0) {
205+
assertThat(tpio.initialOffset()).isEqualTo(1);
206+
}
207+
else {
208+
assertThat(tpio.initialOffset()).isEqualTo(0);
209+
}
210+
});
189211
final CountDownLatch latch2 = new CountDownLatch(1);
190212
FilteringMessageListenerAdapter<Integer, String> filtering = new FilteringMessageListenerAdapter<>(m -> {
191213
trace.set(new RuntimeException().getStackTrace());
@@ -206,7 +228,6 @@ public void testDelegateType() throws Exception {
206228
// assertThat(trace.get()[5].getMethodName()).contains("onMessage"); // bridge
207229
// assertThat(trace.get()[6].getMethodName()).contains("invokeRecordListener");
208230
container.stop();
209-
210231
final CountDownLatch latch3 = new CountDownLatch(1);
211232
filtering = new FilteringMessageListenerAdapter<>(
212233
(AcknowledgingConsumerAwareMessageListener<Integer, String>) (d, a, c) -> {
@@ -2210,10 +2231,12 @@ public Bar(String baz) {
22102231
this.baz = baz;
22112232
}
22122233

2234+
@SuppressWarnings("unused")
22132235
private String getBaz() {
22142236
return this.baz;
22152237
}
22162238

2239+
@SuppressWarnings("unused")
22172240
private void setBaz(String baz) {
22182241
this.baz = baz;
22192242
}
@@ -2237,10 +2260,12 @@ public Bar1(String baz) {
22372260
this.baz = baz;
22382261
}
22392262

2263+
@SuppressWarnings("unused")
22402264
private String getBaz() {
22412265
return this.baz;
22422266
}
22432267

2268+
@SuppressWarnings("unused")
22442269
private void setBaz(String baz) {
22452270
this.baz = baz;
22462271
}

src/reference/asciidoc/kafka.adoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1777,6 +1777,23 @@ ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
17771777
bar
17781778
----
17791779

1780+
[[events]]
1781+
==== Events
1782+
1783+
The following events are published by listener containers and their consumers:
1784+
1785+
* `ContainerIdleEvent` - when no messages have been received in `idleInterval` (if configured)
1786+
* `NonResponsiveConsumerEvent` - when the consumer appears to be blocked in the `poll` method
1787+
* `ConsumerPausedEvent` - issued by each consumer when the container is paused
1788+
* `ConsumerResumedEvent` - issued by each consumer when the container is resumed
1789+
* `ConsumerStoppingEvent` - issued by each consumer just before stopping
1790+
* `ConsumerStoppedEvent` - issued after the consumer is closed; see <<thread-safety>>
1791+
* `ContainerStoppedEvent` - when all consumers have terminated
1792+
1793+
IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
1794+
If you change the multicaster to use an async executor, you must not invoke any `Consumer` methods when the event contains a reference to the consumer.
1795+
1796+
17801797
[[serdes]]
17811798
==== Serialization/Deserialization and Message Conversion
17821799

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ They can be configured to publish failed records to a dead-letter topic.
4040

4141
See <<after-rollback>>, <<seek-to-current>> and <<dead-letters>> for more information.
4242

43+
The `ConsumerStoppingEvent` has been added.
44+
See <<events>> for more information.
45+
4346
==== @KafkaListener Changes
4447

4548
You can now override the `concurrency` and `autoStartup` properties of the listener container factory by setting properties on the annotation.

0 commit comments

Comments
 (0)