diff --git a/build.gradle b/build.gradle index b4a621141f..08aa9bb992 100644 --- a/build.gradle +++ b/build.gradle @@ -60,7 +60,7 @@ ext { jaywayJsonPathVersion = '2.9.0' junit4Version = '4.13.2' junitJupiterVersion = '5.13.4' - kafkaVersion = '4.0.0' + kafkaVersion = '4.1.0' kotlinCoroutinesVersion = '1.10.2' log4jVersion = '2.25.1' micrometerDocsVersion = '1.0.4' diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java index 914e4f2fee..e259904a57 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/DefaultShareConsumerFactoryTests.java @@ -31,7 +31,6 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.clients.consumer.AcknowledgeType; import org.apache.kafka.clients.consumer.ShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -56,8 +55,6 @@ @EmbeddedKafka( topics = {"embedded-share-test", "embedded-share-distribution-test"}, partitions = 1, brokerProperties = { - "unstable.api.versions.enable=true", - "group.coordinator.rebalance.protocols=classic,share", "share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1" }) @@ -248,7 +245,9 @@ private static List runSharedConsumerTest(String topic, String groupId, var records = consumer.poll(Duration.ofMillis(200)); for (var r : records) { allReceived.add(r.value()); - consumer.acknowledge(r, AcknowledgeType.ACCEPT); + // Leverage implicit acknowledgment where records are automatically treated as ACCEPT + // Use commitSync() for explicit commit timing instead of relying on next poll() auto-commit + consumer.commitSync(Duration.ofMillis(10000)); latch.countDown(); } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java index 59e2af6a28..8ddadbcedb 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaListenerIntegrationTests.java @@ -63,8 +63,6 @@ @DirtiesContext @EmbeddedKafka(topics = "share-listener-integration-test", brokerProperties = { - "unstable.api.versions.enable=true", - "group.coordinator.rebalance.protocols=classic,share", "share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1" }) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java index 1e990a43d7..7aff9fa602 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainerIntegrationTests.java @@ -50,8 +50,6 @@ @EmbeddedKafka( topics = {"share-listener-integration-test"}, partitions = 1, brokerProperties = { - "unstable.api.versions.enable=true", - "group.coordinator.rebalance.protocols=classic,share", "share.coordinator.state.topic.replication.factor=1", "share.coordinator.state.topic.min.isr=1" } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java index ada5464a19..275c8251a7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/serializer/SerializationIntegrationTests.java @@ -20,6 +20,7 @@ import java.util.regex.Pattern; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.internals.Plugin; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.Test; @@ -37,6 +38,7 @@ /** * @author Gary Russell + * @author Soby Chacko * @since 2.8.1 * */ @@ -46,6 +48,7 @@ public class SerializationIntegrationTests { public static final String DBTD_TOPIC = "dbtd"; @Test + @SuppressWarnings("unchecked") void configurePreLoadedDelegates() { Map consumerProps = KafkaTestUtils.consumerProps(EmbeddedKafkaCondition.getBroker(), DBTD_TOPIC, false); @@ -63,8 +66,11 @@ void configurePreLoadedDelegates() { props.setMessageListener(mock(MessageListener.class)); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cFact, props); container.start(); - assertThat(KafkaTestUtils.getPropertyValue(container, - "listenerConsumer.consumer.delegate.deserializers.valueDeserializer")) + //The Deserializers class uses a plugin mechanism to retrieve the actual deserializer. + Plugin> valueDeserializerPlugin = (Plugin>) KafkaTestUtils.getPropertyValue(container, + "listenerConsumer.consumer.delegate.deserializers.valueDeserializerPlugin"); + assertThat(valueDeserializerPlugin).isNotNull(); + assertThat(valueDeserializerPlugin.get()) .isSameAs(delegating); Map delegates = KafkaTestUtils.getPropertyValue(delegating, "delegates", Map.class); assertThat(delegates).hasSize(1);