diff --git a/build.gradle b/build.gradle index a30b8ea8ce..61a5a9b258 100644 --- a/build.gradle +++ b/build.gradle @@ -71,6 +71,7 @@ subprojects { subproject -> assertkVersion = '0.12' googleJsr305Version = '3.0.2' hamcrestVersion = '1.3' + hibernateValidationVersion = '6.0.12.Final' jacksonVersion = '2.9.6' jaywayJsonPathVersion = '2.4.0' junit4Version = '4.12' @@ -205,6 +206,7 @@ project ('spring-kafka') { compile ("com.jayway.jsonpath:json-path:$jaywayJsonPathVersion", optional) testCompile project (":spring-kafka-test") + testCompile "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion" } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java index 01f70140bf..5305f6cde5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,8 +49,8 @@ * The {@code KafkaListenerContainerFactory} is responsible to create the listener * container for a particular endpoint. Typical implementations, as the * {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory - * ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the necessary - * configuration options that are supported by the underlying + * ConcurrentKafkaListenerContainerFactory} used in the sample above, provides the + * necessary configuration options that are supported by the underlying * {@link org.springframework.kafka.listener.MessageListenerContainer * MessageListenerContainer}. * @@ -113,8 +113,7 @@ * Annotated methods can use a flexible signature; in particular, it is possible to use * the {@link org.springframework.messaging.Message Message} abstraction and related * annotations, see {@link KafkaListener} Javadoc for more details. For instance, the - * following would inject the content of the message and the kafka partition - * header: + * following would inject the content of the message and the kafka partition header: * *
  * @KafkaListener(containerFactory = "myKafkaListenerContainerFactory", topics = "myTopic")
@@ -165,9 +164,10 @@
  * {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
  * KafkaListenerEndpointRegistry} in case you need more control on the way the containers
  * are created and managed. The example below also demonstrates how to customize the
- * {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory} to use with a custom
- * {@link org.springframework.validation.Validator Validator} so that payloads annotated
- * with {@link org.springframework.validation.annotation.Validated Validated} are first
+ * {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory}
+ * as well as how to supply a custom {@link org.springframework.validation.Validator
+ * Validator} so that payloads annotated with
+ * {@link org.springframework.validation.annotation.Validated Validated} are first
  * validated against a custom {@code Validator}.
  *
  * 
@@ -180,6 +180,7 @@
  * 		public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
  * 			registrar.setEndpointRegistry(myKafkaListenerEndpointRegistry());
  * 			registrar.setMessageHandlerMethodFactory(myMessageHandlerMethodFactory);
+ * 			registrar.setValidator(new MyValidator());
  * 		}
  *
  * 		@Bean
@@ -190,7 +191,7 @@
  * 		@Bean
  * 		public MessageHandlerMethodFactory myMessageHandlerMethodFactory() {
  * 			DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
- * 			factory.setValidator(new MyValidator());
+ * 			// factory configuration
  * 			return factory;
  * 		}
  *
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
index 9d6a45dbe4..44adf79edf 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
@@ -80,6 +80,7 @@
 import org.springframework.util.Assert;
 import org.springframework.util.ReflectionUtils;
 import org.springframework.util.StringUtils;
+import org.springframework.validation.Validator;
 
 /**
  * Bean post-processor that registers methods annotated with {@link KafkaListener}
@@ -750,6 +751,10 @@ private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
 
 		private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
 			DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
+			Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
+			if (validator != null) {
+				defaultFactory.setValidator(validator);
+			}
 			defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
 
 			ConfigurableBeanFactory cbf =
@@ -768,7 +773,7 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
 			// Type-based argument resolution
 			final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
 			argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
-			argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
+			argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {
 
 				@Override
 				protected boolean isEmptyPayload(Object payload) {
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
index b2ff61451c..723d70b050 100644
--- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
+++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014-2016 the original author or authors.
+ * Copyright 2014-2018 the original author or authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,8 +22,10 @@
 import org.springframework.beans.factory.BeanFactory;
 import org.springframework.beans.factory.BeanFactoryAware;
 import org.springframework.beans.factory.InitializingBean;
+import org.springframework.lang.Nullable;
 import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
 import org.springframework.util.Assert;
+import org.springframework.validation.Validator;
 
 /**
  * Helper bean for registering {@link KafkaListenerEndpoint} with
@@ -52,6 +54,8 @@ public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, Initial
 
 	private boolean startImmediately;
 
+	private Validator validator;
+
 	/**
 	 * Set the {@link KafkaListenerEndpointRegistry} instance to use.
 	 * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} instance to use.
@@ -82,6 +86,8 @@ public KafkaListenerEndpointRegistry getEndpointRegistry() {
 	 * @param kafkaHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.
 	 */
 	public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory) {
+		Assert.isNull(this.validator,
+				"A validator cannot be provided with a custom message handler factory");
 		this.messageHandlerMethodFactory = kafkaHandlerMethodFactory;
 	}
 
@@ -126,6 +132,26 @@ public void setBeanFactory(BeanFactory beanFactory) {
 		this.beanFactory = beanFactory;
 	}
 
+	/**
+	 * Get the validator, if supplied.
+	 * @return the validator.
+	 * @since 2.2
+	 */
+	@Nullable
+	public Validator getValidator() {
+		return this.validator;
+	}
+
+	/**
+	 * Set the validator to use if the default message handler factory is used.
+	 * @param validator the validator.
+	 * @since 2.2
+	 */
+	public void setValidator(Validator validator) {
+		Assert.isNull(this.messageHandlerMethodFactory,
+				"A validator cannot be provided with a custom message handler factory");
+		this.validator = validator;
+	}
 
 	@Override
 	public void afterPropertiesSet() {
diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
index bee493a326..473f498177 100644
--- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
+++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java
@@ -36,6 +36,10 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import javax.validation.Valid;
+import javax.validation.ValidationException;
+import javax.validation.constraints.Max;
+
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -57,6 +61,7 @@
 import org.springframework.core.convert.converter.Converter;
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -110,6 +115,8 @@
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.validation.Errors;
+import org.springframework.validation.Validator;
 
 /**
  * @author Gary Russell
@@ -135,7 +142,7 @@ public class EnableKafkaIntegrationTests {
 			"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
 			"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
 			"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
-			"annotated34");
+			"annotated34", "annotated35");
 
 	private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
 
@@ -507,6 +514,13 @@ public void testListenerErrorHandler() throws Exception {
 		assertThat(this.listener.latch16.await(60, TimeUnit.SECONDS)).isTrue();
 	}
 
+	@Test
+	public void testValidation() throws Exception {
+		template.send("annotated35", 0, "{\"bar\":42}");
+		assertThat(this.listener.validationLatch.await(60, TimeUnit.SECONDS)).isTrue();
+		assertThat(this.listener.validationException).isInstanceOf(ValidationException.class);
+	}
+
 	@Test
 	public void testReplyingListener() throws Exception {
 		Map consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
@@ -686,7 +700,7 @@ public void testAutoConfigTm() {
 	@Configuration
 	@EnableKafka
 	@EnableTransactionManagement(proxyTargetClass = true)
-	public static class Config {
+	public static class Config implements KafkaListenerConfigurer {
 
 		private final CountDownLatch spyLatch = new CountDownLatch(2);
 
@@ -1042,6 +1056,15 @@ public KafkaListenerErrorHandler consumeException(Listener listener) {
 			};
 		}
 
+		@Bean
+		public KafkaListenerErrorHandler validationErrorHandler(Listener listener) {
+			return (m, e) -> {
+				listener.validationException = (Exception) e.getCause();
+				listener.validationLatch.countDown();
+				return null;
+			};
+		}
+
 		@Bean
 		public KafkaListenerErrorHandler replyErrorHandler() {
 			return (m, e) -> ((String) m.getPayload()).toLowerCase();
@@ -1156,6 +1179,23 @@ public FooConverter fooConverter() {
 			return new FooConverter();
 		}
 
+		@Override
+		public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+			registrar.setValidator(new Validator() {
+
+				@Override
+				public void validate(Object target, Errors errors) {
+					throw new ValidationException();
+				}
+
+				@Override
+				public boolean supports(Class clazz) {
+					return ValidatedClass.class.isAssignableFrom(clazz);
+				}
+
+			});
+		}
+
 	}
 
 	@Component
@@ -1209,6 +1249,10 @@ static class Listener implements ConsumerSeekAware {
 
 		private final CountDownLatch latch21 = new CountDownLatch(1);
 
+		private final CountDownLatch validationLatch = new CountDownLatch(1);
+
+		private Exception validationException;
+
 		private final CountDownLatch eventLatch = new CountDownLatch(1);
 
 		private volatile Integer partition;
@@ -1491,6 +1535,12 @@ public void pollResults(ConsumerRecords records) {
 			this.latch21.countDown();
 		}
 
+		@KafkaListener(id = "validated", topics = "annotated35", errorHandler = "validationErrorHandler",
+				containerFactory = "kafkaJsonListenerContainerFactory")
+		public void validatedListener(@Payload @Valid ValidatedClass val) {
+			// NOSONAR
+		}
+
 		@Override
 		public void registerSeekCallback(ConsumerSeekCallback callback) {
 			this.seekCallBack.set(callback);
@@ -1725,4 +1775,20 @@ public Foo convert(String source) {
 			return delegate.convert(source);
 		}
 	}
+
+	public static class ValidatedClass {
+
+		@Max(10)
+		private int bar;
+
+		public int getBar() {
+			return this.bar;
+		}
+
+		public void setBar(int bar) {
+			this.bar = bar;
+		}
+
+	}
+
 }
diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc
index 9094c8f901..77efe80e48 100644
--- a/src/reference/asciidoc/kafka.adoc
+++ b/src/reference/asciidoc/kafka.adoc
@@ -1141,6 +1141,93 @@ public void listen(...) { ... }
     registry.getListenerContainer("myContainer").start();
 ----
 
+[[kafka-validation]]
+===== @KafkaListener @Payload Validation
+
+Starting with version 2.2, it is now easier to add a `Validator` to validate `@KafkaListener` `@Payload` arguments.
+Previously, you had to configure a custom `DefaultMessageHandlerMethodFactory` and add it to the registrar.
+Now, you can simply add the validator to the registrar itself.
+
+====
+[source, java]
+----
+@Configuration
+@EnableKafka
+public class Config implements KafkaListenerConfigurer {
+
+    ...
+
+    @Override
+    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+      registrar.setValidator(new MyValidator());
+    }
+}
+----
+====
+
+NOTE: When using Spring Boot with the validation starter, a `LocalValidatorFactoryBean` is auto-configured:
+
+====
+[source, java]
+----
+@Configuration
+@EnableKafka
+public class Config implements KafkaListenerConfigurer {
+
+    @Autowired
+    private LocalValidatorFactoryBean validator;
+    ...
+
+    @Override
+    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
+      registrar.setValidator(this.validator);
+    }
+}
+----
+====
+
+To validate:
+
+====
+[source, java]
+----
+public static class ValidatedClass {
+
+  @Max(10)
+  private int bar;
+
+  public int getBar() {
+    return this.bar;
+  }
+
+  public void setBar(int bar) {
+    this.bar = bar;
+  }
+
+}
+----
+====
+
+and
+
+====
+[source, java]
+----
+@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
+      containerFactory = "kafkaJsonListenerContainerFactory")
+public void validatedListener(@Payload @Valid ValidatedClass val) {
+    ...
+}
+
+@Bean
+public KafkaListenerErrorHandler validationErrorHandler() {
+    return (m, e) -> {
+        ...
+    };
+}
+----
+====
+
 [[rebalance-listeners]]
 ===== Rebalance Listeners
 
diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc
index daa10b3d54..9473be6d6e 100644
--- a/src/reference/asciidoc/whats-new.adoc
+++ b/src/reference/asciidoc/whats-new.adoc
@@ -50,6 +50,9 @@ You can now use `@KafkaListener` as a meta-annotation on your own annotations.
 
 See <> for more information.
 
+It is now easier to configure a `Validator` for `@Payload` validation.
+See <> for more information.
+
 ==== Header Mapping Changes
 
 Headers of type `MimeType` and `MediaType` are now mapped as simple strings in the `RecordHeader` value.