Skip to content

Commit bedf408

Browse files
authored
Add container properties to ShareKafkaListenerContainerFactory
Enables configuration of container properties at the factory level, following the established pattern in other Spring Kafka container factories. This provides a more flexible and Spring-friendly way to configure share consumer containers, particularly for settings like explicit acknowledgment mode, without requiring configuration through Kafka client properties. - Add `getContainerProperties()` method to allow configuration at factory level - Copy factory-level properties to each listener container instance during creation - Update acknowledgment mode determination to respect factory-level settings with proper precedence - Add integration test for factory-level explicit acknowledgment configuration - Update documentation with factory-level configuration example Signed-off-by: Soby Chacko <[email protected]>
1 parent aa0d91d commit bedf408

File tree

3 files changed

+91
-17
lines changed

3 files changed

+91
-17
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -441,21 +441,18 @@ public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
441441

442442
[source,java]
443443
----
444-
@Bean
445-
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
446-
Map<String, Object> props = new HashMap<>();
447-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
448-
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
449-
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
450-
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
451-
return new DefaultShareConsumerFactory<>(props);
452-
}
453-
454444
@Bean
455445
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
456-
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
457-
// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
458-
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
446+
ShareConsumerFactory<String, String> shareConsumerFactory) {
447+
448+
ShareKafkaListenerContainerFactory<String, String> factory =
449+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
450+
451+
// Configure acknowledgment mode at container factory level
452+
// true means explicit acknowledgment is required
453+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
454+
455+
return factory;
459456
}
460457
----
461458

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
2525

26+
import org.springframework.beans.BeanUtils;
2627
import org.springframework.context.ApplicationContext;
2728
import org.springframework.context.ApplicationContextAware;
2829
import org.springframework.context.ApplicationEventPublisher;
@@ -59,6 +60,8 @@ public class ShareKafkaListenerContainerFactory<K, V>
5960

6061
private final ShareConsumerFactory<? super K, ? super V> shareConsumerFactory;
6162

63+
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);
64+
6265
private boolean autoStartup = true;
6366

6467
private int phase = 0;
@@ -116,6 +119,15 @@ public void setConcurrency(int concurrency) {
116119
this.concurrency = concurrency;
117120
}
118121

122+
/**
123+
* Obtain the factory-level container properties - set properties as needed
124+
* and they will be copied to each listener container instance created by this factory.
125+
* @return the properties.
126+
*/
127+
public ContainerProperties getContainerProperties() {
128+
return this.containerProperties;
129+
}
130+
119131
@Override
120132
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
121133
this.applicationEventPublisher = applicationEventPublisher;
@@ -152,7 +164,12 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
152164
// Validate share group configuration
153165
validateShareConfiguration(endpoint);
154166

167+
// Copy factory-level properties to container
168+
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
169+
"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
170+
155171
// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
172+
// Check factory-level properties first, then consumer factory config
156173
boolean explicitAck = determineExplicitAcknowledgment(properties);
157174
properties.setExplicitShareAcknowledgment(explicitAck);
158175

@@ -180,7 +197,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
180197
* <p>
181198
* Configuration precedence (highest to lowest):
182199
* <ol>
183-
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set)</li>
200+
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set via factory-level properties)</li>
184201
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
185202
* <li>Default: {@code false} (implicit acknowledgment)</li>
186203
* </ol>
@@ -189,16 +206,23 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
189206
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
190207
*/
191208
private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) {
192-
// Check Kafka client configuration
209+
// Check factory-level properties first
210+
// If explicitly set to true (non-default), use it with highest precedence
211+
if (this.containerProperties.isExplicitShareAcknowledgment()) {
212+
return true;
213+
}
214+
215+
// Check Kafka client configuration as fallback
193216
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
194217
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
195218

196219
if (clientAckMode != null) {
197220
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
198221
return mode == ShareAcknowledgementMode.EXPLICIT;
199222
}
223+
200224
// Default to implicit acknowledgment (false)
201-
return containerProperties.isExplicitShareAcknowledgment();
225+
return false;
202226
}
203227

204228
private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) {

spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@
7272
"share-listener-consumer-aware-test",
7373
"share-listener-ack-consumer-aware-test",
7474
"share-listener-mixed-ack-test",
75-
"share-listener-error-handling-test"
75+
"share-listener-error-handling-test",
76+
"share-listener-factory-props-test"
7677
},
7778
brokerProperties = {
7879
"share.coordinator.state.topic.replication.factor=1",
@@ -197,6 +198,22 @@ void shouldHandleProcessingErrorsCorrectly() throws Exception {
197198
assertThat(ErrorHandlingTestListener.errorCount.get()).isEqualTo(1);
198199
}
199200

201+
@Test
202+
void shouldSupportExplicitAcknowledgmentViaFactoryContainerProperties() throws Exception {
203+
final String topic = "share-listener-factory-props-test";
204+
final String groupId = "share-factory-props-group";
205+
setShareAutoOffsetResetEarliest(this.broker.getBrokersAsString(), groupId);
206+
207+
// Send test message
208+
kafkaTemplate.send(topic, "factory-test", "factory-props-message");
209+
210+
// Wait for processing
211+
assertThat(FactoryPropsTestListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
212+
assertThat(FactoryPropsTestListener.received.get()).isEqualTo("factory-props-message");
213+
assertThat(FactoryPropsTestListener.acknowledgmentReceived.get()).isNotNull();
214+
assertThat(isAcknowledgedInternal(FactoryPropsTestListener.acknowledgmentReceived.get())).isTrue();
215+
}
216+
200217
/**
201218
* Sets the share.auto.offset.reset group config to earliest for the given groupId.
202219
*/
@@ -261,6 +278,16 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
261278
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
262279
}
263280

281+
@Bean
282+
public ShareKafkaListenerContainerFactory<String, String> factoryPropsShareKafkaListenerContainerFactory(
283+
ShareConsumerFactory<String, String> shareConsumerFactory) {
284+
ShareKafkaListenerContainerFactory<String, String> factory =
285+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
286+
// Configure explicit acknowledgment via factory's container properties
287+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
288+
return factory;
289+
}
290+
264291
@Bean
265292
public ProducerFactory<String, String> producerFactory(EmbeddedKafkaBroker broker) {
266293
Map<String, Object> props = new HashMap<>();
@@ -305,6 +332,11 @@ public MixedAckTestListener mixedAckTestListener() {
305332
public ErrorHandlingTestListener errorHandlingTestListener() {
306333
return new ErrorHandlingTestListener();
307334
}
335+
336+
@Bean
337+
public FactoryPropsTestListener factoryPropsTestListener() {
338+
return new FactoryPropsTestListener();
339+
}
308340
}
309341

310342
// Test listener classes
@@ -480,4 +512,25 @@ public void listen(ConsumerRecord<String, String> record, ShareAcknowledgment ac
480512
}
481513
}
482514

515+
static class FactoryPropsTestListener {
516+
517+
static final CountDownLatch latch = new CountDownLatch(1);
518+
519+
static final AtomicReference<String> received = new AtomicReference<>();
520+
521+
static final AtomicReference<ShareAcknowledgment> acknowledgmentReceived = new AtomicReference<>();
522+
523+
@KafkaListener(topics = "share-listener-factory-props-test",
524+
groupId = "share-factory-props-group",
525+
containerFactory = "factoryPropsShareKafkaListenerContainerFactory")
526+
public void listen(ConsumerRecord<String, String> record, @Nullable ShareAcknowledgment acknowledgment) {
527+
received.set(record.value());
528+
acknowledgmentReceived.set(acknowledgment);
529+
if (acknowledgment != null) {
530+
acknowledgment.acknowledge(); // ACCEPT
531+
}
532+
latch.countDown();
533+
}
534+
}
535+
483536
}

0 commit comments

Comments
 (0)