From 69f4e3b70cb2c6870d96324c6ef3263fd29c04a6 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 30 Jun 2025 20:02:08 -0400 Subject: [PATCH 1/6] Add KafkaListener support for shared consumer containers - Add AbstractShareKafkaListenerContainerFactory base class for share consumer factories - Add ShareKafkaListenerContainerFactory concrete implementation - Add ShareRecordMessagingMessageListenerAdapter for share consumer message handling - Modify MethodKafkaListenerEndpoint to create appropriate listener adapters based on container type - Add integration tests for ShareKafkaListener functionality Signed-off-by: Soby Chacko --- ...actShareKafkaListenerContainerFactory.java | 214 ++++++++++++++++++ .../config/MethodKafkaListenerEndpoint.java | 31 ++- .../ShareKafkaListenerContainerFactory.java | 67 ++++++ ...actShareKafkaMessageListenerContainer.java | 5 + .../ShareKafkaMessageListenerContainer.java | 8 +- ...RecordMessagingMessageListenerAdapter.java | 63 ++++++ .../ShareKafkaListenerIntegrationTests.java | 149 ++++++++++++ 7 files changed, 531 insertions(+), 6 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java new file mode 100644 index 0000000000..013b704680 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java @@ -0,0 +1,214 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.regex.Pattern; + +import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.core.log.LogAccessor; +import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.listener.AbstractShareKafkaMessageListenerContainer; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.support.JavaUtils; +import org.springframework.kafka.support.TopicPartitionOffset; + +/** + * Base {@link KafkaListenerContainerFactory} for creating containers that use Kafka's share consumer model. + *

+ * This abstract factory provides common configuration and lifecycle management for share consumer containers. + * It handles the creation of containers based on endpoints, topics, or patterns, and applies common + * configuration properties to the created containers. + *

+ * The share consumer model enables cooperative rebalancing, allowing consumers to maintain ownership of + * some partitions while relinquishing others during rebalances, which can reduce disruption compared to + * the classic consumer model. + * + * @param the container type + * @param the key type + * @param the value type + * + * @author Soby Chacko + * @since 4.0 + */ +public abstract class AbstractShareKafkaListenerContainerFactory, K, V> + implements KafkaListenerContainerFactory, ApplicationEventPublisherAware, ApplicationContextAware { + + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); + + private @Nullable ShareConsumerFactory shareConsumerFactory; + + private @Nullable Boolean autoStartup; + + private @Nullable Integer phase; + + private @Nullable ApplicationEventPublisher applicationEventPublisher; + + private @Nullable ApplicationContext applicationContext; + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + /** + * Set the share consumer factory to use for creating containers. + * @param shareConsumerFactory the share consumer factory + */ + public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { + this.shareConsumerFactory = shareConsumerFactory; + } + + /** + * Get the share consumer factory. + * @return the share consumer factory + */ + public @Nullable ShareConsumerFactory getShareConsumerFactory() { + return this.shareConsumerFactory; + } + + /** + * Set whether containers created by this factory should auto-start. + * @param autoStartup true to auto-start + */ + public void setAutoStartup(Boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * Set the phase in which containers created by this factory should start and stop. + * @param phase the phase + */ + public void setPhase(Integer phase) { + this.phase = phase; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /** + * Get the container properties. + * @return the container properties + */ + public ContainerProperties getContainerProperties() { + return this.containerProperties; + } + + @Override + public C createListenerContainer(KafkaListenerEndpoint endpoint) { + C instance = createContainerInstance(endpoint); + JavaUtils.INSTANCE + .acceptIfNotNull(endpoint.getId(), instance::setBeanName); + if (endpoint instanceof AbstractKafkaListenerEndpoint) { + configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); + } + endpoint.setupListenerContainer(instance, null); // No message converter for MVP + initializeContainer(instance, endpoint); + return instance; + } + + private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { + // Minimal configuration; can add more properties later + } + + /** + * Initialize the provided container with common configuration properties. + * @param instance the container instance + * @param endpoint the endpoint + */ + protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { + ContainerProperties properties = instance.getContainerProperties(); + if (this.containerProperties.getAckCount() > 0) { + properties.setAckCount(this.containerProperties.getAckCount()); + } + if (this.containerProperties.getAckTime() > 0) { + properties.setAckTime(this.containerProperties.getAckTime()); + } + if (endpoint.getAutoStartup() != null) { + instance.setAutoStartup(endpoint.getAutoStartup()); + } + else if (this.autoStartup != null) { + instance.setAutoStartup(this.autoStartup); + } + if (this.phase != null) { + instance.setPhase(this.phase); + } + if (this.applicationContext != null) { + instance.setApplicationContext(this.applicationContext); + } + if (this.applicationEventPublisher != null) { + instance.setApplicationEventPublisher(this.applicationEventPublisher); + } + if (endpoint.getGroupId() != null) { + instance.getContainerProperties().setGroupId(endpoint.getGroupId()); + } + if (endpoint.getClientIdPrefix() != null) { + instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); + } + if (endpoint.getConsumerProperties() != null) { + instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties()); + } + } + + @Override + public C createContainer(TopicPartitionOffset... topicPartitions) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public TopicPartitionOffset[] getTopicPartitionsToAssign() { + return Arrays.copyOf(topicPartitions, topicPartitions.length); + } + }); + } + + @Override + public C createContainer(String... topics) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public Collection getTopics() { + return Arrays.asList(topics); + } + }); + } + + @Override + public C createContainer(Pattern topicPattern) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public Pattern getTopicPattern() { + return topicPattern; + } + }); + } + + /** + * Create a container instance for the provided endpoint. + * @param endpoint the endpoint + * @return the container instance + */ + protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 3f08435831..c5ebca8c31 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -31,11 +31,13 @@ import org.springframework.expression.BeanResolver; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer; import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.BatchToRecordAdapter; import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; +import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -175,7 +177,14 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); - MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); + + final MessagingMessageListenerAdapter messageListener; + if (container instanceof ShareKafkaMessageListenerContainer) { + messageListener = createShareMessageListenerInstance(messageConverter); + } + else { + messageListener = createMessageListenerInstance(messageConverter); + } messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); JavaUtils.INSTANCE .acceptIfNotNull(getReplyTopic(), replyTopic -> { @@ -206,6 +215,26 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte * @param messageConverter the converter (may be null). * @return the {@link MessagingMessageListenerAdapter} instance. */ + protected MessagingMessageListenerAdapter createShareMessageListenerInstance( + @Nullable MessageConverter messageConverter) { + + MessagingMessageListenerAdapter listener; + ShareRecordMessagingMessageListenerAdapter messageListener = new ShareRecordMessagingMessageListenerAdapter<>( + this.bean, this.method, this.errorHandler); + if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { + messageListener.setMessageConverter(recordMessageConverter); + } + listener = messageListener; + if (this.messagingConverter != null) { + listener.setMessagingConverter(this.messagingConverter); + } + BeanResolver resolver = getBeanResolver(); + if (resolver != null) { + listener.setBeanResolver(resolver); + } + return listener; + } + protected MessagingMessageListenerAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java new file mode 100644 index 0000000000..58d74265f8 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -0,0 +1,67 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import java.util.Collection; + +import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer; +import org.springframework.kafka.support.TopicPartitionOffset; +import org.springframework.util.Assert; + +/** + * A {@link KafkaListenerContainerFactory} implementation to create {@link ShareKafkaMessageListenerContainer} + * instances for Kafka's share consumer model. + * + * @param the key type + * @param the value type + * + * @author Soby Chacko + * @since 4.0 + */ +public class ShareKafkaListenerContainerFactory + extends AbstractShareKafkaListenerContainerFactory, K, V> { + + /** + * Construct an instance with the provided consumer factory. + * @param shareConsumerFactory the share consumer factory + */ + public ShareKafkaListenerContainerFactory(ShareConsumerFactory shareConsumerFactory) { + setShareConsumerFactory(shareConsumerFactory); + } + + @Override + protected ShareKafkaMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) { + TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign(); + if (topicPartitions != null && topicPartitions.length > 0) { + return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), new ContainerProperties(topicPartitions)); + } + else { + Collection topics = endpoint.getTopics(); + Assert.state(topics != null, "'topics' must not be null"); + if (!topics.isEmpty()) { + return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), + new ContainerProperties(topics.toArray(new String[0]))); + } + else { + return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), + new ContainerProperties(endpoint.getTopicPattern())); + } + } + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java index 3dd0cbdd0e..1dd2b190f8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.springframework.beans.BeanUtils; @@ -58,6 +59,9 @@ public abstract class AbstractShareKafkaMessageListenerContainer */ public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; + /** + * The share consumer factory used to create consumer instances. + */ protected final ShareConsumerFactory shareConsumerFactory; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); @@ -66,6 +70,7 @@ public abstract class AbstractShareKafkaMessageListenerContainer protected final ReentrantLock lifecycleLock = new ReentrantLock(); + @NonNull private String beanName = "noBeanNameSet"; @Nullable diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index a5fead7bfe..3d5cb14cef 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-present the original author or authors. + * Copyright 2025-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,8 +43,6 @@ * This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}. * It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop * with per-record dispatch and acknowledgement. - *

- * Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id. * * @param the key type * @param the value type @@ -73,7 +71,7 @@ public class ShareKafkaMessageListenerContainer * @param shareConsumerFactory the share consumer factory * @param containerProperties the container properties */ - public ShareKafkaMessageListenerContainer(ShareConsumerFactory shareConsumerFactory, + public ShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory shareConsumerFactory, ContainerProperties containerProperties) { super(shareConsumerFactory, containerProperties); Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided"); @@ -152,7 +150,7 @@ private void publishConsumerStartedEvent() { } /** - * The inner share consumer thread: polls for records and dispatches to the listener. + * The inner share consumer thread that polls for records and dispatches to the listener. */ private class ShareListenerConsumer implements Runnable { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java new file mode 100644 index 0000000000..17f2d0ff8d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java @@ -0,0 +1,63 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener.adapter; + +import java.lang.reflect.Method; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.jspecify.annotations.Nullable; + +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.listener.MessageListener; + +/** + * A {@link MessageListener} adapter for the share consumer model that invokes a configurable + * {@link HandlerAdapter}. This adapter passes null for acknowledgment and consumer parameters + * to the super implementation. + * + * @param the key type. + * @param the value type. + * + * @author Soby Chacko + * @since 4.0 + */ +public class ShareRecordMessagingMessageListenerAdapter extends RecordMessagingMessageListenerAdapter + implements MessageListener { + + public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method) { + super(bean, method); + } + + public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, + @Nullable KafkaListenerErrorHandler errorHandler) { + super(bean, method, errorHandler); + } + + /** + * Kafka {@link MessageListener} entry point. + *

Delegate the message to the target listener method, + * with appropriate conversion of the message argument. + *

This implementation passes null for both acknowledgment and consumer parameters + * to the super implementation. + * @param record the incoming Kafka {@link ConsumerRecord}. + */ + @Override + public void onMessage(ConsumerRecord record) { + onMessage(record, null, null); + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java new file mode 100644 index 0000000000..53a4fc6ebb --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -0,0 +1,149 @@ +/* + * Copyright 2025-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsResult; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ShareKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.DefaultShareConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ShareConsumerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for share Kafka listener. + * + * @author Soby Chacko + * @since 4.0 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = "share-listener-integration-test", + brokerProperties = { + "unstable.api.versions.enable=true", + "group.coordinator.rebalance.protocols=classic,share", + "share.coordinator.state.topic.replication.factor=1", + "share.coordinator.state.topic.min.isr=1" + }) +class ShareKafkaListenerIntegrationTests { + + private static final CountDownLatch latch = new CountDownLatch(1); + + private static final AtomicReference received = new AtomicReference<>(); + + @Autowired + EmbeddedKafkaBroker broker; + + @Test + void integrationTestShareKafkaListener() throws Exception { + final String topic = "share-listener-integration-test"; + final String groupId = "share-listener-test-group"; + setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId); + + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.broker.getBrokersAsString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + ProducerFactory pf = new DefaultKafkaProducerFactory<>(props); + KafkaTemplate template = new KafkaTemplate<>(pf); + template.setDefaultTopic(topic); + template.sendDefault("foo"); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(received.get()).isEqualTo("foo"); + } + + /** + * Sets the share.auto.offset.reset group config to earliest for the given groupId, + * using the provided bootstrapServers. + */ + private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception { + Map adminProperties = new HashMap<>(); + adminProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + try (Admin admin = Admin.create(adminProperties)) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); + ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest"); + Map> configs = java.util.Collections.singletonMap(configResource, + java.util.Collections.singleton(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))); + AlterConfigsResult alterConfigsResult = admin.incrementalAlterConfigs(configs); + alterConfigsResult.all().get(); + } + } + + @Configuration + @EnableKafka + static class TestConfig { + + @Bean + public ShareConsumerFactory shareConsumerFactory(EmbeddedKafkaBroker broker) { + Map configs = new HashMap<>(); + configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString()); + configs.put(ConsumerConfig.GROUP_ID_CONFIG, "share-listener-test-group"); + configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return new DefaultShareConsumerFactory<>(configs); + } + + @Bean + public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( + ShareConsumerFactory shareConsumerFactory) { + return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); + } + + @Bean + public TestListener listener() { + return new TestListener(); + } + } + + static class TestListener { + + @KafkaListener(topics = "share-listener-integration-test", containerFactory = "shareKafkaListenerContainerFactory") + public void listen(ConsumerRecord record) { + received.set(record.value()); + latch.countDown(); + } + } +} From 8ae7ff9a8b6fd05db9656bedb9263034c7121155 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Tue, 1 Jul 2025 19:55:15 -0400 Subject: [PATCH 2/6] Addressing PR review Signed-off-by: Soby Chacko --- ...actShareKafkaListenerContainerFactory.java | 214 ------------------ .../config/MethodKafkaListenerEndpoint.java | 34 +-- .../ShareKafkaListenerContainerFactory.java | 177 ++++++++++++++- ...actShareKafkaMessageListenerContainer.java | 4 +- .../ShareKafkaMessageListenerContainer.java | 15 +- ...RecordMessagingMessageListenerAdapter.java | 63 ------ .../ShareKafkaListenerIntegrationTests.java | 4 + 7 files changed, 192 insertions(+), 319 deletions(-) delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java delete mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java deleted file mode 100644 index 013b704680..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractShareKafkaListenerContainerFactory.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright 2025-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.config; - -import java.util.Arrays; -import java.util.Collection; -import java.util.regex.Pattern; - -import org.apache.commons.logging.LogFactory; -import org.jspecify.annotations.Nullable; - -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.core.log.LogAccessor; -import org.springframework.kafka.core.ShareConsumerFactory; -import org.springframework.kafka.listener.AbstractShareKafkaMessageListenerContainer; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.support.JavaUtils; -import org.springframework.kafka.support.TopicPartitionOffset; - -/** - * Base {@link KafkaListenerContainerFactory} for creating containers that use Kafka's share consumer model. - *

- * This abstract factory provides common configuration and lifecycle management for share consumer containers. - * It handles the creation of containers based on endpoints, topics, or patterns, and applies common - * configuration properties to the created containers. - *

- * The share consumer model enables cooperative rebalancing, allowing consumers to maintain ownership of - * some partitions while relinquishing others during rebalances, which can reduce disruption compared to - * the classic consumer model. - * - * @param the container type - * @param the key type - * @param the value type - * - * @author Soby Chacko - * @since 4.0 - */ -public abstract class AbstractShareKafkaListenerContainerFactory, K, V> - implements KafkaListenerContainerFactory, ApplicationEventPublisherAware, ApplicationContextAware { - - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); - - private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); - - private @Nullable ShareConsumerFactory shareConsumerFactory; - - private @Nullable Boolean autoStartup; - - private @Nullable Integer phase; - - private @Nullable ApplicationEventPublisher applicationEventPublisher; - - private @Nullable ApplicationContext applicationContext; - - @Override - public void setApplicationContext(ApplicationContext applicationContext) { - this.applicationContext = applicationContext; - } - - /** - * Set the share consumer factory to use for creating containers. - * @param shareConsumerFactory the share consumer factory - */ - public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { - this.shareConsumerFactory = shareConsumerFactory; - } - - /** - * Get the share consumer factory. - * @return the share consumer factory - */ - public @Nullable ShareConsumerFactory getShareConsumerFactory() { - return this.shareConsumerFactory; - } - - /** - * Set whether containers created by this factory should auto-start. - * @param autoStartup true to auto-start - */ - public void setAutoStartup(Boolean autoStartup) { - this.autoStartup = autoStartup; - } - - /** - * Set the phase in which containers created by this factory should start and stop. - * @param phase the phase - */ - public void setPhase(Integer phase) { - this.phase = phase; - } - - @Override - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - - /** - * Get the container properties. - * @return the container properties - */ - public ContainerProperties getContainerProperties() { - return this.containerProperties; - } - - @Override - public C createListenerContainer(KafkaListenerEndpoint endpoint) { - C instance = createContainerInstance(endpoint); - JavaUtils.INSTANCE - .acceptIfNotNull(endpoint.getId(), instance::setBeanName); - if (endpoint instanceof AbstractKafkaListenerEndpoint) { - configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); - } - endpoint.setupListenerContainer(instance, null); // No message converter for MVP - initializeContainer(instance, endpoint); - return instance; - } - - private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { - // Minimal configuration; can add more properties later - } - - /** - * Initialize the provided container with common configuration properties. - * @param instance the container instance - * @param endpoint the endpoint - */ - protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) { - ContainerProperties properties = instance.getContainerProperties(); - if (this.containerProperties.getAckCount() > 0) { - properties.setAckCount(this.containerProperties.getAckCount()); - } - if (this.containerProperties.getAckTime() > 0) { - properties.setAckTime(this.containerProperties.getAckTime()); - } - if (endpoint.getAutoStartup() != null) { - instance.setAutoStartup(endpoint.getAutoStartup()); - } - else if (this.autoStartup != null) { - instance.setAutoStartup(this.autoStartup); - } - if (this.phase != null) { - instance.setPhase(this.phase); - } - if (this.applicationContext != null) { - instance.setApplicationContext(this.applicationContext); - } - if (this.applicationEventPublisher != null) { - instance.setApplicationEventPublisher(this.applicationEventPublisher); - } - if (endpoint.getGroupId() != null) { - instance.getContainerProperties().setGroupId(endpoint.getGroupId()); - } - if (endpoint.getClientIdPrefix() != null) { - instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); - } - if (endpoint.getConsumerProperties() != null) { - instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties()); - } - } - - @Override - public C createContainer(TopicPartitionOffset... topicPartitions) { - return createContainerInstance(new KafkaListenerEndpointAdapter() { - @Override - public TopicPartitionOffset[] getTopicPartitionsToAssign() { - return Arrays.copyOf(topicPartitions, topicPartitions.length); - } - }); - } - - @Override - public C createContainer(String... topics) { - return createContainerInstance(new KafkaListenerEndpointAdapter() { - @Override - public Collection getTopics() { - return Arrays.asList(topics); - } - }); - } - - @Override - public C createContainer(Pattern topicPattern) { - return createContainerInstance(new KafkaListenerEndpointAdapter() { - @Override - public Pattern getTopicPattern() { - return topicPattern; - } - }); - } - - /** - * Create a container instance for the provided endpoint. - * @param endpoint the endpoint - * @return the container instance - */ - protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint); -} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index c5ebca8c31..848f6332c1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -31,13 +31,11 @@ import org.springframework.expression.BeanResolver; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer; import org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.BatchToRecordAdapter; import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter; -import org.springframework.kafka.listener.adapter.ShareRecordMessagingMessageListenerAdapter; import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.MessageConverter; @@ -179,12 +177,7 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis "Could not create message listener - MessageHandlerMethodFactory not set"); final MessagingMessageListenerAdapter messageListener; - if (container instanceof ShareKafkaMessageListenerContainer) { - messageListener = createShareMessageListenerInstance(messageConverter); - } - else { - messageListener = createMessageListenerInstance(messageConverter); - } + messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); JavaUtils.INSTANCE .acceptIfNotNull(getReplyTopic(), replyTopic -> { @@ -210,31 +203,6 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte return new HandlerAdapter(invocableHandlerMethod); } - /** - * Create an empty {@link MessagingMessageListenerAdapter} instance. - * @param messageConverter the converter (may be null). - * @return the {@link MessagingMessageListenerAdapter} instance. - */ - protected MessagingMessageListenerAdapter createShareMessageListenerInstance( - @Nullable MessageConverter messageConverter) { - - MessagingMessageListenerAdapter listener; - ShareRecordMessagingMessageListenerAdapter messageListener = new ShareRecordMessagingMessageListenerAdapter<>( - this.bean, this.method, this.errorHandler); - if (messageConverter instanceof RecordMessageConverter recordMessageConverter) { - messageListener.setMessageConverter(recordMessageConverter); - } - listener = messageListener; - if (this.messagingConverter != null) { - listener.setMessagingConverter(this.messagingConverter); - } - BeanResolver resolver = getBeanResolver(); - if (resolver != null) { - listener.setBeanResolver(resolver); - } - return listener; - } - protected MessagingMessageListenerAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index 58d74265f8..01c5e1dec4 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -16,17 +16,36 @@ package org.springframework.kafka.config; +import java.util.Arrays; import java.util.Collection; +import java.util.regex.Pattern; +import org.apache.commons.logging.LogFactory; +import org.jspecify.annotations.Nullable; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.core.log.LogAccessor; import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer; +import org.springframework.kafka.support.JavaUtils; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.util.Assert; /** * A {@link KafkaListenerContainerFactory} implementation to create {@link ShareKafkaMessageListenerContainer} * instances for Kafka's share consumer model. + *

+ * This factory provides common configuration and lifecycle management for share consumer containers. + * It handles the creation of containers based on endpoints, topics, or patterns, and applies common + * configuration properties to the created containers. + *

+ * The share consumer model enables cooperative rebalancing, allowing consumers to maintain ownership of + * some partitions while relinquishing others during rebalances, which can reduce disruption compared to + * the classic consumer model. * * @param the key type * @param the value type @@ -35,17 +54,171 @@ * @since 4.0 */ public class ShareKafkaListenerContainerFactory - extends AbstractShareKafkaListenerContainerFactory, K, V> { + implements KafkaListenerContainerFactory>, ApplicationEventPublisherAware, ApplicationContextAware { + + protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); + + private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); + + private ShareConsumerFactory shareConsumerFactory; + + private @Nullable Boolean autoStartup; + + private @Nullable Integer phase; + + private @Nullable ApplicationEventPublisher applicationEventPublisher; + + private @Nullable ApplicationContext applicationContext; /** * Construct an instance with the provided consumer factory. * @param shareConsumerFactory the share consumer factory */ public ShareKafkaListenerContainerFactory(ShareConsumerFactory shareConsumerFactory) { - setShareConsumerFactory(shareConsumerFactory); + this.shareConsumerFactory = shareConsumerFactory; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + /** + * Set the share consumer factory to use for creating containers. + * @param shareConsumerFactory the share consumer factory + */ + public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { + this.shareConsumerFactory = shareConsumerFactory; + } + + /** + * Get the share consumer factory. + * @return the share consumer factory + */ + public ShareConsumerFactory getShareConsumerFactory() { + return this.shareConsumerFactory; + } + + /** + * Set whether containers created by this factory should auto-start. + * @param autoStartup true to auto-start + */ + public void setAutoStartup(Boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * Set the phase in which containers created by this factory should start and stop. + * @param phase the phase + */ + public void setPhase(Integer phase) { + this.phase = phase; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + /** + * Get the container properties. + * @return the container properties + */ + public ContainerProperties getContainerProperties() { + return this.containerProperties; + } + + @Override + public ShareKafkaMessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint) { + ShareKafkaMessageListenerContainer instance = createContainerInstance(endpoint); + JavaUtils.INSTANCE + .acceptIfNotNull(endpoint.getId(), instance::setBeanName); + if (endpoint instanceof AbstractKafkaListenerEndpoint) { + configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); + } + endpoint.setupListenerContainer(instance, null); // No message converter for MVP + initializeContainer(instance, endpoint); + return instance; + } + + private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { + // Minimal configuration; can add more properties later + } + + /** + * Initialize the provided container with common configuration properties. + * @param instance the container instance + * @param endpoint the endpoint + */ + protected void initializeContainer(ShareKafkaMessageListenerContainer instance, KafkaListenerEndpoint endpoint) { + ContainerProperties properties = instance.getContainerProperties(); + if (this.containerProperties.getAckCount() > 0) { + properties.setAckCount(this.containerProperties.getAckCount()); + } + if (this.containerProperties.getAckTime() > 0) { + properties.setAckTime(this.containerProperties.getAckTime()); + } + if (endpoint.getAutoStartup() != null) { + instance.setAutoStartup(endpoint.getAutoStartup()); + } + else if (this.autoStartup != null) { + instance.setAutoStartup(this.autoStartup); + } + if (this.phase != null) { + instance.setPhase(this.phase); + } + if (this.applicationContext != null) { + instance.setApplicationContext(this.applicationContext); + } + if (this.applicationEventPublisher != null) { + instance.setApplicationEventPublisher(this.applicationEventPublisher); + } + if (endpoint.getGroupId() != null) { + instance.getContainerProperties().setGroupId(endpoint.getGroupId()); + } + if (endpoint.getClientIdPrefix() != null) { + instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); + } + if (endpoint.getConsumerProperties() != null) { + instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties()); + } } @Override + public ShareKafkaMessageListenerContainer createContainer(TopicPartitionOffset... topicPartitions) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public TopicPartitionOffset[] getTopicPartitionsToAssign() { + return Arrays.copyOf(topicPartitions, topicPartitions.length); + } + }); + } + + @Override + public ShareKafkaMessageListenerContainer createContainer(String... topics) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public Collection getTopics() { + return Arrays.asList(topics); + } + }); + } + + @Override + public ShareKafkaMessageListenerContainer createContainer(Pattern topicPattern) { + return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override + public Pattern getTopicPattern() { + return topicPattern; + } + }); + } + + /** + * Create a container instance for the provided endpoint. + * @param endpoint the endpoint + * @return the container instance + */ protected ShareKafkaMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) { TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign(); if (topicPartitions != null && topicPartitions.length > 0) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java index 1dd2b190f8..68c920f753 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java @@ -21,7 +21,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.springframework.beans.BeanUtils; @@ -70,7 +69,6 @@ public abstract class AbstractShareKafkaMessageListenerContainer protected final ReentrantLock lifecycleLock = new ReentrantLock(); - @NonNull private String beanName = "noBeanNameSet"; @Nullable @@ -91,7 +89,7 @@ public abstract class AbstractShareKafkaMessageListenerContainer * @param containerProperties the properties. */ @SuppressWarnings("unchecked") - protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory shareConsumerFactory, + protected AbstractShareKafkaMessageListenerContainer(ShareConsumerFactory shareConsumerFactory, ContainerProperties containerProperties) { Assert.notNull(containerProperties, "'containerProperties' cannot be null"); Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null"); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index 3d5cb14cef..6c83dfffcb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.kafka.clients.consumer.AcknowledgeType; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -71,7 +72,7 @@ public class ShareKafkaMessageListenerContainer * @param shareConsumerFactory the share consumer factory * @param containerProperties the container properties */ - public ShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory shareConsumerFactory, + public ShareKafkaMessageListenerContainer(ShareConsumerFactory shareConsumerFactory, ContainerProperties containerProperties) { super(shareConsumerFactory, containerProperties); Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided"); @@ -182,6 +183,7 @@ String getClientId() { } @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public void run() { initialize(); Throwable exitThrowable = null; @@ -190,9 +192,14 @@ public void run() { var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT)); if (records != null && records.count() > 0) { for (var record : records) { - @SuppressWarnings("unchecked") - GenericMessageListener listener = (GenericMessageListener) this.genericListener; - listener.onMessage(record); + if (this.genericListener instanceof AcknowledgingConsumerAwareMessageListener ackListener) { + ackListener.onMessage(record, null, null); + } + else { + GenericMessageListener> listener = + (GenericMessageListener>) this.genericListener; + listener.onMessage(record); + } // Temporarily auto-acknowledge and commit. // We will refactor it later on to support more production-like scenarios. this.consumer.acknowledge(record, AcknowledgeType.ACCEPT); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java deleted file mode 100644 index 17f2d0ff8d..0000000000 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ShareRecordMessagingMessageListenerAdapter.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2002-present the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.listener.adapter; - -import java.lang.reflect.Method; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.jspecify.annotations.Nullable; - -import org.springframework.kafka.listener.KafkaListenerErrorHandler; -import org.springframework.kafka.listener.MessageListener; - -/** - * A {@link MessageListener} adapter for the share consumer model that invokes a configurable - * {@link HandlerAdapter}. This adapter passes null for acknowledgment and consumer parameters - * to the super implementation. - * - * @param the key type. - * @param the value type. - * - * @author Soby Chacko - * @since 4.0 - */ -public class ShareRecordMessagingMessageListenerAdapter extends RecordMessagingMessageListenerAdapter - implements MessageListener { - - public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method) { - super(bean, method); - } - - public ShareRecordMessagingMessageListenerAdapter(@Nullable Object bean, @Nullable Method method, - @Nullable KafkaListenerErrorHandler errorHandler) { - super(bean, method, errorHandler); - } - - /** - * Kafka {@link MessageListener} entry point. - *

Delegate the message to the target listener method, - * with appropriate conversion of the message argument. - *

This implementation passes null for both acknowledgment and consumer parameters - * to the super implementation. - * @param record the incoming Kafka {@link ConsumerRecord}. - */ - @Override - public void onMessage(ConsumerRecord record) { - onMessage(record, null, null); - } - -} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java index 53a4fc6ebb..59e2af6a28 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -110,6 +110,7 @@ private static void setShareAutoOffsetResetEarliest(String bootstrapServers, Str AlterConfigsResult alterConfigsResult = admin.incrementalAlterConfigs(configs); alterConfigsResult.all().get(); } + } @Configuration @@ -129,6 +130,7 @@ public ShareConsumerFactory shareConsumerFactory(EmbeddedKafkaBr @Bean public ShareKafkaListenerContainerFactory shareKafkaListenerContainerFactory( ShareConsumerFactory shareConsumerFactory) { + return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory); } @@ -136,6 +138,7 @@ public ShareKafkaListenerContainerFactory shareKafkaListenerCont public TestListener listener() { return new TestListener(); } + } static class TestListener { @@ -146,4 +149,5 @@ public void listen(ConsumerRecord record) { latch.countDown(); } } + } From 20978ded83528f069fe13779edc57cee00e084d6 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 2 Jul 2025 14:41:35 -0400 Subject: [PATCH 3/6] Addressing PR review Signed-off-by: Soby Chacko --- .../config/MethodKafkaListenerEndpoint.java | 8 +++-- .../ShareKafkaListenerContainerFactory.java | 34 ++++--------------- .../ShareKafkaMessageListenerContainer.java | 1 - 3 files changed, 12 insertions(+), 31 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 848f6332c1..db6dbb97a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -176,8 +176,7 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); - final MessagingMessageListenerAdapter messageListener; - messageListener = createMessageListenerInstance(messageConverter); + final MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); JavaUtils.INSTANCE .acceptIfNotNull(getReplyTopic(), replyTopic -> { @@ -203,6 +202,11 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte return new HandlerAdapter(invocableHandlerMethod); } + /** + * Create an empty {@link MessagingMessageListenerAdapter} instance. + * @param messageConverter the converter (may be null). + * @return the {@link MessagingMessageListenerAdapter} instance. + */ protected MessagingMessageListenerAdapter createMessageListenerInstance( @Nullable MessageConverter messageConverter) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index 01c5e1dec4..f09e55e7c8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -186,12 +186,7 @@ else if (this.autoStartup != null) { @Override public ShareKafkaMessageListenerContainer createContainer(TopicPartitionOffset... topicPartitions) { - return createContainerInstance(new KafkaListenerEndpointAdapter() { - @Override - public TopicPartitionOffset[] getTopicPartitionsToAssign() { - return Arrays.copyOf(topicPartitions, topicPartitions.length); - } - }); + throw new UnsupportedOperationException("ShareConsumer does not support explicit partition assignment"); } @Override @@ -206,12 +201,7 @@ public Collection getTopics() { @Override public ShareKafkaMessageListenerContainer createContainer(Pattern topicPattern) { - return createContainerInstance(new KafkaListenerEndpointAdapter() { - @Override - public Pattern getTopicPattern() { - return topicPattern; - } - }); + throw new UnsupportedOperationException("ShareConsumer does not support topic patterns"); } /** @@ -220,21 +210,9 @@ public Pattern getTopicPattern() { * @return the container instance */ protected ShareKafkaMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) { - TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign(); - if (topicPartitions != null && topicPartitions.length > 0) { - return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), new ContainerProperties(topicPartitions)); - } - else { - Collection topics = endpoint.getTopics(); - Assert.state(topics != null, "'topics' must not be null"); - if (!topics.isEmpty()) { - return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), - new ContainerProperties(topics.toArray(new String[0]))); - } - else { - return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), - new ContainerProperties(endpoint.getTopicPattern())); - } - } + Collection topics = endpoint.getTopics(); + Assert.state(topics != null, "'topics' must not be null"); + return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), + new ContainerProperties(topics.toArray(new String[0]))); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index 6c83dfffcb..b96f033697 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -172,7 +172,6 @@ private class ShareListenerConsumer implements Runnable { this.genericListener = listener; this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); - // Subscribe to topics, just like in the test ContainerProperties containerProperties = getContainerProperties(); this.consumer.subscribe(Arrays.asList(containerProperties.getTopics())); } From e9ee474d34422422cbcf64de714f2a78ebf568d2 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 2 Jul 2025 17:28:31 -0400 Subject: [PATCH 4/6] Addressing PR review Signed-off-by: Soby Chacko --- .../config/MethodKafkaListenerEndpoint.java | 3 +- .../ShareKafkaListenerContainerFactory.java | 23 +++++++------- .../ShareKafkaMessageListenerContainer.java | 30 ++++++++++--------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index db6dbb97a1..3f08435831 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -175,8 +175,7 @@ protected MessagingMessageListenerAdapter createMessageListener(MessageLis Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); - - final MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); + MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); JavaUtils.INSTANCE .acceptIfNotNull(getReplyTopic(), replyTopic -> { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index f09e55e7c8..bab4dfa69b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -83,14 +83,6 @@ public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } - /** - * Set the share consumer factory to use for creating containers. - * @param shareConsumerFactory the share consumer factory - */ - public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { - this.shareConsumerFactory = shareConsumerFactory; - } - /** * Get the share consumer factory. * @return the share consumer factory @@ -99,6 +91,14 @@ public void setShareConsumerFactory(ShareConsumerFactory s return this.shareConsumerFactory; } + /** + * Set the share consumer factory to use for creating containers. + * @param shareConsumerFactory the share consumer factory + */ + public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { + this.shareConsumerFactory = shareConsumerFactory; + } + /** * Set whether containers created by this factory should auto-start. * @param autoStartup true to auto-start @@ -136,7 +136,8 @@ public ShareKafkaMessageListenerContainer createListenerContainer(KafkaLis if (endpoint instanceof AbstractKafkaListenerEndpoint) { configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); } - endpoint.setupListenerContainer(instance, null); // No message converter for MVP + // TODO: No message converter for queue at the moment + endpoint.setupListenerContainer(instance, null); initializeContainer(instance, endpoint); return instance; } @@ -192,6 +193,7 @@ public ShareKafkaMessageListenerContainer createContainer(TopicPartitionOf @Override public ShareKafkaMessageListenerContainer createContainer(String... topics) { return createContainerInstance(new KafkaListenerEndpointAdapter() { + @Override public Collection getTopics() { return Arrays.asList(topics); @@ -213,6 +215,7 @@ protected ShareKafkaMessageListenerContainer createContainerInstance(Kafka Collection topics = endpoint.getTopics(); Assert.state(topics != null, "'topics' must not be null"); return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), - new ContainerProperties(topics.toArray(new String[0]))); + new ContainerProperties(topics.toArray(new String[0]))); } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java index b96f033697..f94c78c099 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java @@ -78,14 +78,6 @@ public ShareKafkaMessageListenerContainer(ShareConsumerFactory listener) { this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer( - ShareKafkaMessageListenerContainer.this.getGroupId(), - ShareKafkaMessageListenerContainer.this.getClientId()); + ShareKafkaMessageListenerContainer.this.getGroupId(), + ShareKafkaMessageListenerContainer.this.getClientId()); this.genericListener = listener; this.clientId = ShareKafkaMessageListenerContainer.this.getClientId(); @@ -196,7 +196,7 @@ public void run() { } else { GenericMessageListener> listener = - (GenericMessageListener>) this.genericListener; + (GenericMessageListener>) this.genericListener; listener.onMessage(record); } // Temporarily auto-acknowledge and commit. @@ -239,9 +239,11 @@ private void wrapUp() { @Override public String toString() { return "ShareKafkaMessageListenerContainer.ShareListenerConsumer [" - + "consumerGroupId=" + this.consumerGroupId - + ", clientId=" + this.clientId - + "]"; + + "consumerGroupId=" + this.consumerGroupId + + ", clientId=" + this.clientId + + "]"; } + } + } From d0f69658ed0d3955b10ea73dab00252824f97bb7 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 3 Jul 2025 12:57:03 -0400 Subject: [PATCH 5/6] Addressing PR review Signed-off-by: Soby Chacko --- .../ShareKafkaListenerContainerFactory.java | 82 ++++--------------- 1 file changed, 18 insertions(+), 64 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index bab4dfa69b..f206260c36 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -20,14 +20,12 @@ import java.util.Collection; import java.util.regex.Pattern; -import org.apache.commons.logging.LogFactory; import org.jspecify.annotations.Nullable; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.core.log.LogAccessor; import org.springframework.kafka.core.ShareConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer; @@ -56,11 +54,7 @@ public class ShareKafkaListenerContainerFactory implements KafkaListenerContainerFactory>, ApplicationEventPublisherAware, ApplicationContextAware { - protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); - - private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); - - private ShareConsumerFactory shareConsumerFactory; + private final ShareConsumerFactory shareConsumerFactory; private @Nullable Boolean autoStartup; @@ -83,22 +77,6 @@ public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } - /** - * Get the share consumer factory. - * @return the share consumer factory - */ - public ShareConsumerFactory getShareConsumerFactory() { - return this.shareConsumerFactory; - } - - /** - * Set the share consumer factory to use for creating containers. - * @param shareConsumerFactory the share consumer factory - */ - public void setShareConsumerFactory(ShareConsumerFactory shareConsumerFactory) { - this.shareConsumerFactory = shareConsumerFactory; - } - /** * Set whether containers created by this factory should auto-start. * @param autoStartup true to auto-start @@ -120,21 +98,14 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv this.applicationEventPublisher = applicationEventPublisher; } - /** - * Get the container properties. - * @return the container properties - */ - public ContainerProperties getContainerProperties() { - return this.containerProperties; - } - @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public ShareKafkaMessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint) { ShareKafkaMessageListenerContainer instance = createContainerInstance(endpoint); JavaUtils.INSTANCE .acceptIfNotNull(endpoint.getId(), instance::setBeanName); - if (endpoint instanceof AbstractKafkaListenerEndpoint) { - configureEndpoint((AbstractKafkaListenerEndpoint) endpoint); + if (endpoint instanceof AbstractKafkaListenerEndpoint abstractKafkaListenerEndpoint) { + configureEndpoint(abstractKafkaListenerEndpoint); } // TODO: No message converter for queue at the moment endpoint.setupListenerContainer(instance, null); @@ -153,36 +124,19 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { */ protected void initializeContainer(ShareKafkaMessageListenerContainer instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); - if (this.containerProperties.getAckCount() > 0) { - properties.setAckCount(this.containerProperties.getAckCount()); - } - if (this.containerProperties.getAckTime() > 0) { - properties.setAckTime(this.containerProperties.getAckTime()); - } - if (endpoint.getAutoStartup() != null) { - instance.setAutoStartup(endpoint.getAutoStartup()); - } - else if (this.autoStartup != null) { - instance.setAutoStartup(this.autoStartup); - } - if (this.phase != null) { - instance.setPhase(this.phase); - } - if (this.applicationContext != null) { - instance.setApplicationContext(this.applicationContext); - } - if (this.applicationEventPublisher != null) { - instance.setApplicationEventPublisher(this.applicationEventPublisher); - } - if (endpoint.getGroupId() != null) { - instance.getContainerProperties().setGroupId(endpoint.getGroupId()); - } - if (endpoint.getClientIdPrefix() != null) { - instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix()); - } - if (endpoint.getConsumerProperties() != null) { - instance.getContainerProperties().setKafkaConsumerProperties(endpoint.getConsumerProperties()); - } + JavaUtils.INSTANCE + .acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup) + .acceptIfNotNull(this.autoStartup, autoStartup -> { + if (endpoint.getAutoStartup() == null) { + instance.setAutoStartup(autoStartup); + } + }) + .acceptIfNotNull(this.phase, instance::setPhase) + .acceptIfNotNull(this.applicationContext, instance::setApplicationContext) + .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher) + .acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId) + .acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId) + .acceptIfNotNull(endpoint.getConsumerProperties(), properties::setKafkaConsumerProperties); } @Override @@ -214,7 +168,7 @@ public ShareKafkaMessageListenerContainer createContainer(Pattern topicPat protected ShareKafkaMessageListenerContainer createContainerInstance(KafkaListenerEndpoint endpoint) { Collection topics = endpoint.getTopics(); Assert.state(topics != null, "'topics' must not be null"); - return new ShareKafkaMessageListenerContainer<>(getShareConsumerFactory(), + return new ShareKafkaMessageListenerContainer<>(this.shareConsumerFactory, new ContainerProperties(topics.toArray(new String[0]))); } From 8829ac9d53dc12878588680b9ab5f330d472a19a Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 3 Jul 2025 13:07:52 -0400 Subject: [PATCH 6/6] Addressing PR review Signed-off-by: Soby Chacko --- .../kafka/config/ShareKafkaListenerContainerFactory.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java index f206260c36..fdfa68d6bb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java @@ -124,13 +124,9 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint endpoint) { */ protected void initializeContainer(ShareKafkaMessageListenerContainer instance, KafkaListenerEndpoint endpoint) { ContainerProperties properties = instance.getContainerProperties(); + Boolean effectiveAutoStartup = endpoint.getAutoStartup() != null ? endpoint.getAutoStartup() : this.autoStartup; JavaUtils.INSTANCE - .acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup) - .acceptIfNotNull(this.autoStartup, autoStartup -> { - if (endpoint.getAutoStartup() == null) { - instance.setAutoStartup(autoStartup); - } - }) + .acceptIfNotNull(effectiveAutoStartup, instance::setAutoStartup) .acceptIfNotNull(this.phase, instance::setPhase) .acceptIfNotNull(this.applicationContext, instance::setApplicationContext) .acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)