diff --git a/build.gradle b/build.gradle index a30b8ea8ce..61a5a9b258 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,7 @@ subprojects { subproject -> assertkVersion = '0.12' googleJsr305Version = '3.0.2' hamcrestVersion = '1.3' + hibernateValidationVersion = '6.0.12.Final' jacksonVersion = '2.9.6' jaywayJsonPathVersion = '2.4.0' junit4Version = '4.12' @@ -205,6 +206,7 @@ project ('spring-kafka') { compile ("com.jayway.jsonpath:json-path:$jaywayJsonPathVersion", optional) testCompile project (":spring-kafka-test") + testCompile "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion" } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java index 01f70140bf..5305f6cde5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,8 +49,8 @@ * The {@code KafkaListenerContainerFactory} is responsible to create the listener * container for a particular endpoint. Typical implementations, as the * {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory - * ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the necessary - * configuration options that are supported by the underlying + * ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the + * necessary configuration options that are supported by the underlying * {@link org.springframework.kafka.listener.MessageListenerContainer * MessageListenerContainer}. * @@ -113,8 +113,7 @@ * Annotated methods can use a flexible signature; in particular, it is possible to use * the {@link org.springframework.messaging.Message Message} abstraction and related * annotations, see {@link KafkaListener} Javadoc for more details. For instance, the - * following would inject the content of the message and the kafka partition - * header: + * following would inject the content of the message and the kafka partition header: * *
* @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
@@ -165,9 +164,10 @@
* {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry} in case you need more control on the way the containers
* are created and managed. The example below also demonstrates how to customize the
- * {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory} to use with a custom
- * {@link org.springframework.validation.Validator Validator} so that payloads annotated
- * with {@link org.springframework.validation.annotation.Validated Validated} are first
+ * {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory}
+ * as well as how to supply a custom {@link org.springframework.validation.Validator
+ * Validator} so that payloads annotated with
+ * {@link org.springframework.validation.annotation.Validated Validated} are first
* validated against a custom {@code Validator}.
*
*
@@ -180,6 +180,7 @@
* public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
* registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
* registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
+ * registrar.setValidator(new MyValidator());
* }
*
* @Bean
@@ -190,7 +191,7 @@
* @Bean
* public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
* DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
- * factory.setValidator(new MyValidator());
+ * // factory configuration
* return factory;
* }
*
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
index 9d6a45dbe4..44adf79edf 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
@@ -80,6 +80,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
+import org.springframework.validation.Validator;
/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
@@ -750,6 +751,10 @@ private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
+ Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
+ if (validator != null) {
+ defaultFactory.setValidator(validator);
+ }
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
ConfigurableBeanFactory cbf =
@@ -768,7 +773,7 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
// Type-based argument resolution
final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
- argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
+ argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {
@Override
protected boolean isEmptyPayload(Object payload) {
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
index b2ff61451c..723d70b050 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014-2016 the original author or authors.
+ * Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,8 +22,10 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
+import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.util.Assert;
+import org.springframework.validation.Validator;
/**
* Helper bean for registering {@link KafkaListenerEndpoint} with
@@ -52,6 +54,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial
private boolean startImmediately;
+ private Validator validator;
+
/**
* Set the {@link KafkaListenerEndpointRegistry} instance to use.
* @param endpointRegistry the {@link KafkaListenerEndpointRegistry} instance to use.
@@ -82,6 +86,8 @@ public KafkaListenerEndpointRegistry getEndpointRegistry() {
* @param kafkaHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
*/
public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory) {
+ Assert.isNull(this.validator,
+ "A validator cannot be provided with a custom message handler factory");
this.messageHandlerMethodFactory = kafkaHandlerMethodFactory;
}
@@ -126,6 +132,26 @@ public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
+ /**
+ * Get the validator, if supplied.
+ * @return the validator.
+ * @since 2.2
+ */
+ @Nullable
+ public Validator getValidator() {
+ return this.validator;
+ }
+
+ /**
+ * Set the validator to use if the default message handler factory is used.
+ * @param validator the validator.
+ * @since 2.2
+ */
+ public void setValidator(Validator validator) {
+ Assert.isNull(this.messageHandlerMethodFactory,
+ "A validator cannot be provided with a custom message handler factory");
+ this.validator = validator;
+ }
@Override
public void afterPropertiesSet() {
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
index bee493a326..473f498177 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
@@ -36,6 +36,10 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import javax.validation.Valid;
+import javax.validation.ValidationException;
+import javax.validation.constraints.Max;
+
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -57,6 +61,7 @@
import org.springframework.core.convert.converter.Converter;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -110,6 +115,8 @@
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
/**
* @author Gary Russell
@@ -135,7 +142,7 @@ public class EnableKafkaIntegrationTests {
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
- "annotated34");
+ "annotated34", "annotated35");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@@ -507,6 +514,13 @@ public void testListenerErrorHandler() throws Exception {
assertThat(this.listener.latch16.await(60, TimeUnit.SECONDS)).isTrue();
}
+ @Test
+ public void testValidation() throws Exception {
+ template.send("annotated35", 0, "{\"bar\":42}");
+ assertThat(this.listener.validationLatch.await(60, TimeUnit.SECONDS)).isTrue();
+ assertThat(this.listener.validationException).isInstanceOf(ValidationException.class);
+ }
+
@Test
public void testReplyingListener() throws Exception {
Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
@@ -686,7 +700,7 @@ public void testAutoConfigTm() {
@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
- public static class Config {
+ public static class Config implements KafkaListenerConfigurer {
private final CountDownLatch spyLatch = new CountDownLatch(2);
@@ -1042,6 +1056,15 @@ public KafkaListenerErrorHandler consumeException(Listener listener) {
};
}
+ @Bean
+ public KafkaListenerErrorHandler validationErrorHandler(Listener listener) {
+ return (m, e) -> {
+ listener.validationException = (Exception) e.getCause();
+ listener.validationLatch.countDown();
+ return null;
+ };
+ }
+
@Bean
public KafkaListenerErrorHandler replyErrorHandler() {
return (m, e) -> ((String) m.getPayload()).toLowerCase();
@@ -1156,6 +1179,23 @@ public FooConverter fooConverter() {
return new FooConverter();
}
+ @Override
+ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+ registrar.setValidator(new Validator() {
+
+ @Override
+ public void validate(Object target, Errors errors) {
+ throw new ValidationException();
+ }
+
+ @Override
+ public boolean supports(Class> clazz) {
+ return ValidatedClass.class.isAssignableFrom(clazz);
+ }
+
+ });
+ }
+
}
@Component
@@ -1209,6 +1249,10 @@ static class Listener implements ConsumerSeekAware {
private final CountDownLatch latch21 = new CountDownLatch(1);
+ private final CountDownLatch validationLatch = new CountDownLatch(1);
+
+ private Exception validationException;
+
private final CountDownLatch eventLatch = new CountDownLatch(1);
private volatile Integer partition;
@@ -1491,6 +1535,12 @@ public void pollResults(ConsumerRecords, ?> records) {
this.latch21.countDown();
}
+ @KafkaListener(id = "validated", topics = "annotated35", errorHandler = "validationErrorHandler",
+ containerFactory = "kafkaJsonListenerContainerFactory")
+ public void validatedListener(@Payload @Valid ValidatedClass val) {
+ // NOSONAR
+ }
+
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallBack.set(callback);
@@ -1725,4 +1775,20 @@ public Foo convert(String source) {
return delegate.convert(source);
}
}
+
+ public static class ValidatedClass {
+
+ @Max(10)
+ private int bar;
+
+ public int getBar() {
+ return this.bar;
+ }
+
+ public void setBar(int bar) {
+ this.bar = bar;
+ }
+
+ }
+
}
diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc
index 9094c8f901..77efe80e48 100644
--- a/src/reference/asciidoc/kafka.adoc
+++ b/src/reference/asciidoc/kafka.adoc
@@ -1141,6 +1141,93 @@ public void listen(...) { ... }
registry.getListenerContainer("myContainer").start();
----
+[[kafka-validation]]
+===== @KafkaListener @Payload Validation
+
+Starting with version 2.2, it is now easier to add a `Validator` to validate `@KafkaListener` `@Payload` arguments.
+Previously, you had to configure a custom `DefaultMessageHandlerMethodFactory` and add it to the registrar.
+Now, you can simply add the validator to the registrar itself.
+
+====
+[source, java]
+----
+@Configuration
+@EnableKafka
+public class Config implements KafkaListenerConfigurer {
+
+ ...
+
+ @Override
+ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+ registrar.setValidator(new MyValidator());
+ }
+}
+----
+====
+
+NOTE: When using Spring Boot with the validation starter, a `LocalValidatorFactoryBean` is auto-configured:
+
+====
+[source, java]
+----
+@Configuration
+@EnableKafka
+public class Config implements KafkaListenerConfigurer {
+
+ @Autowired
+ private LocalValidatorFactoryBean validator;
+ ...
+
+ @Override
+ public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+ registrar.setValidator(this.validator);
+ }
+}
+----
+====
+
+To validate:
+
+====
+[source, java]
+----
+public static class ValidatedClass {
+
+ @Max(10)
+ private int bar;
+
+ public int getBar() {
+ return this.bar;
+ }
+
+ public void setBar(int bar) {
+ this.bar = bar;
+ }
+
+}
+----
+====
+
+and
+
+====
+[source, java]
+----
+@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
+ containerFactory = "kafkaJsonListenerContainerFactory")
+public void validatedListener(@Payload @Valid ValidatedClass val) {
+ ...
+}
+
+@Bean
+public KafkaListenerErrorHandler validationErrorHandler() {
+ return (m, e) -> {
+ ...
+ };
+}
+----
+====
+
[[rebalance-listeners]]
===== Rebalance Listeners
diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc
index daa10b3d54..9473be6d6e 100644
--- a/src/reference/asciidoc/whats-new.adoc
+++ b/src/reference/asciidoc/whats-new.adoc
@@ -50,6 +50,9 @@ You can now use `@KafkaListener` as a meta-annotation on your own annotations.
See <> for more information.
+It is now easier to configure a `Validator` for `@Payload` validation.
+See <> for more information.
+
==== Header Mapping Changes
Headers of type `MimeType` and `MediaType` are now mapped as simple strings in the `RecordHeader` value.