Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2770,6 +2770,8 @@ An error message is also logged when this condition occurs.
* `ConsumerStoppingEvent`: published by each consumer just before stopping.
* `ConsumerStoppedEvent`: published after the consumer is closed.
See <<thread-safety>>.
* `ConsumerRetryAuthEvent`: published when authentication or authorization of a consumer fails and is being retried.
* `ConsumerRetryAuthSuccessfulEvent`: published when authentication or authorization has been retried successfully. Can only occur when there has been a `ConsumerRetryAuthEvent` before.
* `ContainerStoppedEvent`: published when all consumers have stopped.

IMPORTANT: By default, the application context's event multicaster invokes event listeners on the calling thread.
Expand Down Expand Up @@ -2829,7 +2831,15 @@ The `ConsumerPartitionPausedEvent`, `ConsumerPartitionResumedEvent` events have
* `container`: The listener container or the parent listener container, if the source container is a child.
* `partition`: The `TopicPartition` instance involved.

The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent` and `ContainerStoppedEvent` events have the following properties:
The `ConsumerRetryAuthEvent` event has the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
* `reason`
** `AUTHENTICATION` - the event was published because of an authentication exception.
** `AUTHORIZATION` - the event was published because of an authorization exception.

The `ConsumerStartingEvent`, `ConsumerStartingEvent`, `ConsumerFailedToStartEvent`, `ConsumerStoppedEvent`, `ConsumerRetryAuthSuccessfulEvent` and `ContainerStoppedEvent` events have the following properties:

* `source`: The listener container instance that published the event.
* `container`: The listener container or the parent listener container, if the source container is a child.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.event;

/**
* An event published when authentication or authorization of a consumer fails and
* is being retried. Contains the reason for this event.
*
* @author Daniel Gentes
* @since 3.0
*
*/
public class ConsumerRetryAuthEvent extends KafkaEvent {

private static final long serialVersionUID = 1L;

/**
* Reasons for retrying auth a consumer.
*/
public enum Reason {
/**
* An authentication exception occurred.
*/
AUTHENTICATION,

/**
* An authorization exception occurred.
*/
AUTHORIZATION
}

private final Reason reason;

/**
* Construct an instance with the provided source and container.
* @param source the container instance that generated the event.
* @param container the container or the parent container
* if the container is a child.
* @param reason the reason.
*/
public ConsumerRetryAuthEvent(Object source, Object container, Reason reason) {
super(source, container);
this.reason = reason;
}

/**
* Return the reason for the auth failure.
* @return the reason.
*/
public Reason getReason() {
return this.reason;
}

@Override
public String toString() {
return "ConsumerRetryAuthEvent [source=" + getSource() + ", reason=" + this.reason + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.event;

/**
* An event published when authentication or authorization has been retried successfully.
*
* @author Daniel Gentes
* @since 3.0
*
*/
public class ConsumerRetryAuthSuccessfulEvent extends KafkaEvent {

private static final long serialVersionUID = 1L;

/**
* Construct an instance with the provided source and container.
* @param source the container instance that generated the event.
* @param container the container or the parent container
* if the container is a child.
*/
public ConsumerRetryAuthSuccessfulEvent(Object source, Object container) {
super(source, container);
}

@Override
public String toString() {
return "ConsumerRetryAuthSuccessfulEvent [source=" + getSource() + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import org.springframework.kafka.event.ConsumerPartitionResumedEvent;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
Expand Down Expand Up @@ -146,6 +148,7 @@
* @author Lukasz Kaminski
* @author Tomaz Fernandes
* @author Francois Rosiere
* @author Daniel Gentes
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> {
Expand Down Expand Up @@ -529,6 +532,30 @@ private void publishConsumerFailedToStart() {
}
}

private void publishRetryAuthEvent(Throwable throwable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add your name to the @author list: you have made some changes to the class, so you deserve some credit.

ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
ConsumerRetryAuthEvent.Reason reason;
if (throwable instanceof AuthenticationException) {
reason = ConsumerRetryAuthEvent.Reason.AUTHENTICATION;
}
else if (throwable instanceof AuthorizationException) {
reason = ConsumerRetryAuthEvent.Reason.AUTHORIZATION;
}
else {
throw new IllegalArgumentException("Only Authentication or Authorization Excetions are allowed", throwable);
}
publisher.publishEvent(new ConsumerRetryAuthEvent(this, this.thisOrParentContainer, reason));
}
}

private void publishRetryAuthSuccessfulEvent() {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerRetryAuthSuccessfulEvent(this, this.thisOrParentContainer));
}
}

@Override
protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
return this.thisOrParentContainer;
Expand Down Expand Up @@ -1235,9 +1262,14 @@ public void run() {
initAssignedPartitions();
publishConsumerStartedEvent();
Throwable exitThrowable = null;
boolean failedAuthRetry = false;
while (isRunning()) {
try {
pollAndInvoke();
if (failedAuthRetry) {
publishRetryAuthSuccessfulEvent();
failedAuthRetry = false;
}
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
Expand All @@ -1257,6 +1289,8 @@ public void run() {
ListenerConsumer.this.logger.error(ae,
"Authentication/Authorization Exception, retrying in "
+ this.authExceptionRetryInterval.toMillis() + " ms");
publishRetryAuthEvent(ae);
failedAuthRetry = true;
// We can't pause/resume here, as KafkaConsumer doesn't take pausing
// into account when committing, hence risk of being flooded with
// GroupAuthorizationExceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.ConsumerRetryAuthEvent;
import org.springframework.kafka.event.ConsumerRetryAuthSuccessfulEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
import org.springframework.kafka.event.ConsumerStoppingEvent;
Expand Down Expand Up @@ -135,6 +137,7 @@
* @author Loic Talhouarne
* @author Lukasz Kaminski
* @author Ray Chuan Tay
* @author Daniel Gentes
*/
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
Expand Down Expand Up @@ -3224,9 +3227,17 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
CountDownLatch latch = new CountDownLatch(2);
CountDownLatch retryEvent = new CountDownLatch(2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: your name to the @author list.

CountDownLatch retrySuccessfulEventFired = new CountDownLatch(1);
AtomicReference<ConsumerRetryAuthEvent.Reason> reason = new AtomicReference<>();
willAnswer(invoc -> {
latch.countDown();
throw new TopicAuthorizationException("test");
if (latch.getCount() > 0) {
latch.countDown();
throw new TopicAuthorizationException("test");
}
else {
return new ConsumerRecords<>(Collections.emptyMap());
}
}).given(consumer).poll(any());

ContainerProperties containerProps = new ContainerProperties(topic1);
Expand All @@ -3236,8 +3247,20 @@ void testNotFatalErrorOnAuthorizationException() throws Exception {
containerProps.setAuthExceptionRetryInterval(Duration.ofMillis(100));
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setApplicationEventPublisher(e -> {
if (e instanceof ConsumerRetryAuthEvent) {
reason.set(((ConsumerRetryAuthEvent) e).getReason());
retryEvent.countDown();
}
else if (e instanceof ConsumerRetryAuthSuccessfulEvent) {
retrySuccessfulEventFired.countDown();
}
});
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(retryEvent.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(reason.get()).isEqualTo(ConsumerRetryAuthEvent.Reason.AUTHORIZATION);
assertThat(retrySuccessfulEventFired.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(container.isInExpectedState()).isTrue();
}
Expand Down