From 7be026cf4faedfbc6a9507d31f69e654707b9671 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 6 Nov 2018 17:20:35 -0500 Subject: [PATCH 1/3] GH-842: Fix NPEs in KafkaListenerAnnotationBPP Fixes spring-projects/spring-kafka#842 **Cherry-pick to 2.1.x, 2.0.x and 1.3.x** --- ...kaListenerAnnotationBeanPostProcessor.java | 39 +++++++++++++------ .../config/AbstractKafkaListenerEndpoint.java | 3 +- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index ced5bd2e8b..b281f16529 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -130,8 +130,7 @@ public class KafkaListenerAnnotationBeanPostProcessor */ public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory"; - private final Set> nonAnnotatedClasses = - Collections.newSetFromMap(new ConcurrentHashMap, Boolean>(64)); + private final Set> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64)); private final Log logger = LogFactory.getLog(getClass()); @@ -271,7 +270,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea Class targetClass = AopUtils.getTargetClass(bean); Collection classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; - final List multiMethods = new ArrayList(); + final List multiMethods = new ArrayList<>(); Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup>() { @@ -318,7 +317,7 @@ public Set inspect(Method method) { * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */ private Collection findListenerAnnotations(Class clazz) { - Set listeners = new HashSet(); + Set listeners = new HashSet<>(); KafkaListener ann = AnnotationUtils.findAnnotation(clazz, KafkaListener.class); if (ann != null) { listeners.add(ann); @@ -334,7 +333,7 @@ private Collection findListenerAnnotations(Class clazz) { * AnnotationUtils.getRepeatableAnnotations does not look at interfaces */ private Set findListenerAnnotations(Method method) { - Set listeners = new HashSet(); + Set listeners = new HashSet<>(); KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class); if (ann != null) { listeners.add(ann); @@ -348,7 +347,8 @@ private Set findListenerAnnotations(Method method) { private void processMultiMethodListeners(Collection classLevelListeners, List multiMethods, Object bean, String beanName) { - List checkedMethods = new ArrayList(); + + List checkedMethods = new ArrayList<>(); Method defaultMethod = null; for (Method method : multiMethods) { Method checked = checkProxy(method, bean); @@ -362,7 +362,7 @@ private void processMultiMethodListeners(Collection classLevelLis } for (KafkaListener classLevelListener : classLevelListeners) { MultiMethodKafkaListenerEndpoint endpoint = - new MultiMethodKafkaListenerEndpoint(checkedMethods, defaultMethod, bean); + new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName); } @@ -370,7 +370,7 @@ private void processMultiMethodListeners(Collection classLevelLis protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); - MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint(); + MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler"); @@ -517,6 +517,9 @@ private Pattern resolvePattern(KafkaListener kafkaListener) { else if (resolved instanceof String) { pattern = Pattern.compile((String) resolved); } + else if (resolved == null) { + return null; + } else { throw new IllegalStateException( "topicPattern must resolve to a Pattern or String, not " + resolved.getClass()); @@ -658,13 +661,16 @@ private String resolveExpressionAsString(String value, String attribute) { if (resolved instanceof String) { return (String) resolved; } + else if (resolved == null) { + return null; + } else { throw new IllegalStateException("The [" + attribute + "] must resolve to a String. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } } - private int resolveExpressionAsInteger(String value, String attribute) { + private Integer resolveExpressionAsInteger(String value, String attribute) { Object resolved = resolveExpression(value); if (resolved instanceof String) { return Integer.parseInt((String) resolved); @@ -672,6 +678,9 @@ private int resolveExpressionAsInteger(String value, String attribute) { else if (resolved instanceof Number) { return ((Number) resolved).intValue(); } + else if (resolved == null) { + return null; + } else { throw new IllegalStateException( "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. " @@ -679,7 +688,7 @@ else if (resolved instanceof Number) { } } - private boolean resolveExpressionAsBoolean(String value, String attribute) { + private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); if (resolved instanceof Boolean) { return (Boolean) resolved; @@ -688,6 +697,9 @@ else if (resolved instanceof String) { final String s = (String) resolved; return Boolean.parseBoolean(s); } + else if (resolved == null) { + return null; + } else { throw new IllegalStateException( "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. " @@ -728,7 +740,9 @@ private void addFormatters(FormatterRegistry registry) { private Collection getBeansOfType(Class type) { if (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) { - return ((ListableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory).getBeansOfType(type).values(); + return ((ListableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory) + .getBeansOfType(type) + .values(); } else { return Collections.emptySet(); @@ -789,7 +803,8 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { argumentResolvers.add(new HeadersMethodArgumentResolver()); // Type-based argument resolution - final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService); + final GenericMessageConverter messageConverter = + new GenericMessageConverter(this.defaultFormattingConversionService); argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter)); argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index efbbf0f424..58da5dd31d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -58,6 +58,7 @@ * * @author Stephane Nicoll * @author Gary Russell + * @author Artem Bilan * * @see MethodKafkaListenerEndpoint */ @@ -359,7 +360,7 @@ public Integer getConcurrency() { * @param concurrency the concurrency. * @since 2.2 */ - public void setConcurrency(int concurrency) { + public void setConcurrency(Integer concurrency) { this.concurrency = concurrency; } From e4fc19b245bb62954acb46c7d30a3466f3028470 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 7 Nov 2018 10:22:37 -0500 Subject: [PATCH 2/3] * Move null check to the `else` branch --- ...kaListenerAnnotationBeanPostProcessor.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index b281f16529..2820a75035 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -517,10 +517,10 @@ private Pattern resolvePattern(KafkaListener kafkaListener) { else if (resolved instanceof String) { pattern = Pattern.compile((String) resolved); } - else if (resolved == null) { - return null; - } else { + if (resolved == null) { + return null; + } throw new IllegalStateException( "topicPattern must resolve to a Pattern or String, not " + resolved.getClass()); } @@ -661,10 +661,10 @@ private String resolveExpressionAsString(String value, String attribute) { if (resolved instanceof String) { return (String) resolved; } - else if (resolved == null) { - return null; - } else { + if (resolved == null) { + return null; + } throw new IllegalStateException("The [" + attribute + "] must resolve to a String. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } @@ -678,10 +678,10 @@ private Integer resolveExpressionAsInteger(String value, String attribute) { else if (resolved instanceof Number) { return ((Number) resolved).intValue(); } - else if (resolved == null) { - return null; - } else { + if (resolved == null) { + return null; + } throw new IllegalStateException( "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); @@ -697,10 +697,10 @@ else if (resolved instanceof String) { final String s = (String) resolved; return Boolean.parseBoolean(s); } - else if (resolved == null) { - return null; - } else { + if (resolved == null) { + return null; + } throw new IllegalStateException( "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); From eb37bc67085c2e1863278607041b755561379d7d Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 7 Nov 2018 10:46:43 -0500 Subject: [PATCH 3/3] * Do not use `if (resolved == null)` at all --- ...kaListenerAnnotationBeanPostProcessor.java | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 2820a75035..66710f9037 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -517,10 +517,7 @@ private Pattern resolvePattern(KafkaListener kafkaListener) { else if (resolved instanceof String) { pattern = Pattern.compile((String) resolved); } - else { - if (resolved == null) { - return null; - } + else if (resolved != null) { throw new IllegalStateException( "topicPattern must resolve to a Pattern or String, not " + resolved.getClass()); } @@ -661,56 +658,49 @@ private String resolveExpressionAsString(String value, String attribute) { if (resolved instanceof String) { return (String) resolved; } - else { - if (resolved == null) { - return null; - } + else if (resolved != null) { throw new IllegalStateException("The [" + attribute + "] must resolve to a String. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } + return null; } private Integer resolveExpressionAsInteger(String value, String attribute) { Object resolved = resolveExpression(value); + Integer result = null; if (resolved instanceof String) { - return Integer.parseInt((String) resolved); + result = Integer.parseInt((String) resolved); } else if (resolved instanceof Number) { - return ((Number) resolved).intValue(); + result =((Number) resolved).intValue(); } - else { - if (resolved == null) { - return null; - } + else if (resolved != null) { throw new IllegalStateException( "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } + return result; } private Boolean resolveExpressionAsBoolean(String value, String attribute) { Object resolved = resolveExpression(value); + Boolean result = null; if (resolved instanceof Boolean) { - return (Boolean) resolved; + result = (Boolean) resolved; } else if (resolved instanceof String) { - final String s = (String) resolved; - return Boolean.parseBoolean(s); + result = Boolean.parseBoolean((String) resolved); } - else { - if (resolved == null) { - return null; - } + else if (resolved != null) { throw new IllegalStateException( "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. " + "Resolved to [" + resolved.getClass() + "] for [" + value + "]"); } + return result; } private Object resolveExpression(String value) { - String resolvedValue = resolve(value); - - return this.resolver.evaluate(resolvedValue, this.expressionContext); + return this.resolver.evaluate(resolve(value), this.expressionContext); } /**