Skip to content

Commit 5fc8220

Browse files
committed
Add factory-level container properties support to ShareKafkaListenerContainerFactory
Enables configuration of container properties at the factory level, following the established pattern in other Spring Kafka container factories. This provides a more flexible and Spring-friendly way to configure share consumer containers, particularly for settings like explicit acknowledgment mode, without requiring configuration through Kafka client properties. - Add `getContainerProperties()` method to allow configuration at factory level - Copy factory-level properties to each listener container instance during creation - Update acknowledgment mode determination to respect factory-level settings with proper precedence - Add integration test for factory-level explicit acknowledgment configuration - Update documentation with factory-level configuration example Signed-off-by: Soby Chacko <[email protected]>
1 parent c102814 commit 5fc8220

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -441,21 +441,18 @@ public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
441441

442442
[source,java]
443443
----
444-
@Bean
445-
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
446-
Map<String, Object> props = new HashMap<>();
447-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
448-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
449-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
450-
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
451-
return new DefaultShareConsumerFactory<>(props);
452-
}
453-
454444
@Bean
455445
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
456-
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
457-
// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
458-
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
446+
ShareConsumerFactory<String, String> shareConsumerFactory) {
447+
448+
ShareKafkaListenerContainerFactory<String, String> factory =
449+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
450+
451+
// Configure acknowledgment mode at container factory level
452+
// true means explicit acknowledgment is required
453+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
454+
455+
return factory;
459456
}
460457
----
461458

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
2525

26+
import org.springframework.beans.BeanUtils;
2627
import org.springframework.context.ApplicationContext;
2728
import org.springframework.context.ApplicationContextAware;
2829
import org.springframework.context.ApplicationEventPublisher;
@@ -59,6 +60,8 @@ public class ShareKafkaListenerContainerFactory<K, V>
5960

6061
private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
6162

63+
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
64+
6265
private boolean autoStartup = true;
6366

6467
private int phase = 0;
@@ -116,6 +119,15 @@ public void setConcurrency(int concurrency) {
116119
this.concurrency = concurrency;
117120
}
118121

122+
/**
123+
* Obtain the factory-level container properties - set properties as needed
124+
* and they will be copied to each listener container instance created by this factory.
125+
* @return the properties.
126+
*/
127+
public ContainerProperties getContainerProperties() {
128+
return this.containerProperties;
129+
}
130+
119131
@Override
120132
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
121133
this.applicationEventPublisher = applicationEventPublisher;
@@ -152,7 +164,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
152164
// Validate share group configuration
153165
validateShareConfiguration(endpoint);
154166

167+
// Copy factory-level properties to container
168+
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
169+
"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
170+
155171
// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
172+
// Check factory-level properties first, then consumer factory config
156173
boolean explicitAck = determineExplicitAcknowledgment(properties);
157174
properties.setExplicitShareAcknowledgment(explicitAck);
158175

@@ -180,7 +197,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
180197
* <p>
181198
* Configuration precedence (highest to lowest):
182199
* <ol>
183-
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)</li>
200+
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set via factory-level properties)</li>
184201
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
185202
* <li>Default: {@code false} (implicit acknowledgment)</li>
186203
* </ol>
@@ -189,16 +206,23 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
189206
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
190207
*/
191208
private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) {
192-
// Check Kafka client configuration
209+
// Check factory-level properties first
210+
// If explicitly set to true (non-default), use it with highest precedence
211+
if (this.containerProperties.isExplicitShareAcknowledgment()) {
212+
return true;
213+
}
214+
215+
// Check Kafka client configuration as fallback
193216
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
194217
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
195218

196219
if (clientAckMode != null) {
197220
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
198221
return mode == ShareAcknowledgementMode.EXPLICIT;
199222
}
223+
200224
// Default to implicit acknowledgment (false)
201-
return containerProperties.isExplicitShareAcknowledgment();
225+
return false;
202226
}
203227

204228
private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) {

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@
7272
"share-listener-consumer-aware-test",
7373
"share-listener-ack-consumer-aware-test",
7474
"share-listener-mixed-ack-test",
75-
"share-listener-error-handling-test"
75+
"share-listener-error-handling-test",
76+
"share-listener-factory-props-test"
7677
},
7778
brokerProperties = {
7879
"share.coordinator.state.topic.replication.factor=1",
@@ -197,6 +198,22 @@ void shouldHandleProcessingErrorsCorrectly() throws Exception {
197198
assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1);
198199
}
199200

201+
@Test
202+
void shouldSupportExplicitAcknowledgmentViaFactoryContainerProperties() throws Exception {
203+
final String topic = "share-listener-factory-props-test";
204+
final String groupId = "share-factory-props-group";
205+
setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId);
206+
207+
// Send test message
208+
kafkaTemplate.send(topic, "factory-test", "factory-props-message");
209+
210+
// Wait for processing
211+
assertThat(FactoryPropsTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
212+
assertThat(FactoryPropsTestListener.received.get()).isEqualTo("factory-props-message");
213+
assertThat(FactoryPropsTestListener.acknowledgmentReceived.get()).isNotNull();
214+
assertThat(isAcknowledgedInternal(FactoryPropsTestListener.acknowledgmentReceived.get())).isTrue();
215+
}
216+
200217
/**
201218
* Sets the share.auto.offset.reset group config to earliest for the given groupId.
202219
*/
@@ -261,6 +278,16 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
261278
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
262279
}
263280

281+
@Bean
282+
public ShareKafkaListenerContainerFactory<String, String> factoryPropsShareKafkaListenerContainerFactory(
283+
ShareConsumerFactory<String, String> shareConsumerFactory) {
284+
ShareKafkaListenerContainerFactory<String, String> factory =
285+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
286+
// Configure explicit acknowledgment via factory's container properties
287+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
288+
return factory;
289+
}
290+
264291
@Bean
265292
public ProducerFactory<String, String> producerFactory(EmbeddedKafkaBroker broker) {
266293
Map<String, Object> props = new HashMap<>();
@@ -305,6 +332,11 @@ public MixedAckTestListener mixedAckTestListener() {
305332
public ErrorHandlingTestListener errorHandlingTestListener() {
306333
return new ErrorHandlingTestListener();
307334
}
335+
336+
@Bean
337+
public FactoryPropsTestListener factoryPropsTestListener() {
338+
return new FactoryPropsTestListener();
339+
}
308340
}
309341

310342
// Test listener classes
@@ -480,4 +512,25 @@ public void listen(ConsumerRecord<String, String> record, ShareAcknowledgment ac
480512
}
481513
}
482514

515+
static class FactoryPropsTestListener {
516+
517+
static final CountDownLatch latch = new CountDownLatch(1);
518+
519+
static final AtomicReference<String> received = new AtomicReference<>();
520+
521+
static final AtomicReference<ShareAcknowledgment> acknowledgmentReceived = new AtomicReference<>();
522+
523+
@KafkaListener(topics = "share-listener-factory-props-test",
524+
groupId = "share-factory-props-group",
525+
containerFactory = "factoryPropsShareKafkaListenerContainerFactory")
526+
public void listen(ConsumerRecord<String, String> record, @Nullable ShareAcknowledgment acknowledgment) {
527+
received.set(record.value());
528+
acknowledgmentReceived.set(acknowledgment);
529+
if (acknowledgment != null) {
530+
acknowledgment.acknowledge(); // ACCEPT
531+
}
532+
latch.countDown();
533+
}
534+
}
535+
483536
}

0 commit comments

Comments
 (0)