Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
*/
public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

private final Set<Class<?>> nonAnnotatedClasses =
Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64));
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));

private final Log logger = LogFactory.getLog(getClass());

Expand Down Expand Up @@ -271,7 +270,7 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<Method>();
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

Expand Down Expand Up @@ -318,7 +317,7 @@ public Set<KafkaListener> inspect(Method method) {
* AnnotationUtils.getRepeatableAnnotations does not look at interfaces
*/
private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
Set<KafkaListener> listeners = new HashSet<KafkaListener>();
Set<KafkaListener> listeners = new HashSet<>();
KafkaListener ann = AnnotationUtils.findAnnotation(clazz, KafkaListener.class);
if (ann != null) {
listeners.add(ann);
Expand All @@ -334,7 +333,7 @@ private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
* AnnotationUtils.getRepeatableAnnotations does not look at interfaces
*/
private Set<KafkaListener> findListenerAnnotations(Method method) {
Set<KafkaListener> listeners = new HashSet<KafkaListener>();
Set<KafkaListener> listeners = new HashSet<>();
KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
if (ann != null) {
listeners.add(ann);
Expand All @@ -348,7 +347,8 @@ private Set<KafkaListener> findListenerAnnotations(Method method) {

private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
Object bean, String beanName) {
List<Method> checkedMethods = new ArrayList<Method>();

List<Method> checkedMethods = new ArrayList<>();
Method defaultMethod = null;
for (Method method : multiMethods) {
Method checked = checkProxy(method, bean);
Expand All @@ -362,15 +362,15 @@ private void processMultiMethodListeners(Collection<KafkaListener> classLevelLis
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods, defaultMethod, bean);
new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
}

protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
Expand Down Expand Up @@ -517,7 +517,7 @@ private Pattern resolvePattern(KafkaListener kafkaListener) {
else if (resolved instanceof String) {
pattern = Pattern.compile((String) resolved);
}
else {
else if (resolved != null) {
throw new IllegalStateException(
"topicPattern must resolve to a Pattern or String, not " + resolved.getClass());
}
Expand Down Expand Up @@ -658,47 +658,49 @@ private String resolveExpressionAsString(String value, String attribute) {
if (resolved instanceof String) {
return (String) resolved;
}
else {
else if (resolved != null) {
throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return null;
}

private int resolveExpressionAsInteger(String value, String attribute) {
private Integer resolveExpressionAsInteger(String value, String attribute) {
Object resolved = resolveExpression(value);
Integer result = null;
if (resolved instanceof String) {
return Integer.parseInt((String) resolved);
result = Integer.parseInt((String) resolved);
}
else if (resolved instanceof Number) {
return ((Number) resolved).intValue();
result =((Number) resolved).intValue();
}
else {
else if (resolved != null) {
throw new IllegalStateException(
"The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return result;
}

private boolean resolveExpressionAsBoolean(String value, String attribute) {
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
if (resolved instanceof Boolean) {
return (Boolean) resolved;
result = (Boolean) resolved;
}
else if (resolved instanceof String) {
final String s = (String) resolved;
return Boolean.parseBoolean(s);
result = Boolean.parseBoolean((String) resolved);
}
else {
else if (resolved != null) {
throw new IllegalStateException(
"The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
+ "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
}
return result;
}

private Object resolveExpression(String value) {
String resolvedValue = resolve(value);

return this.resolver.evaluate(resolvedValue, this.expressionContext);
return this.resolver.evaluate(resolve(value), this.expressionContext);
}

/**
Expand Down Expand Up @@ -728,7 +730,9 @@ private void addFormatters(FormatterRegistry registry) {

private <T> Collection<T> getBeansOfType(Class<T> type) {
if (KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {
return ((ListableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory).getBeansOfType(type).values();
return ((ListableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory)
.getBeansOfType(type)
.values();
}
else {
return Collections.emptySet();
Expand Down Expand Up @@ -789,7 +793,8 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
argumentResolvers.add(new HeadersMethodArgumentResolver());

// Type-based argument resolution
final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
final GenericMessageConverter messageConverter =
new GenericMessageConverter(this.defaultFormattingConversionService);
argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
*
* @author Stephane Nicoll
* @author Gary Russell
* @author Artem Bilan
*
* @see MethodKafkaListenerEndpoint
*/
Expand Down Expand Up @@ -359,7 +360,7 @@ public Integer getConcurrency() {
* @param concurrency the concurrency.
* @since 2.2
*/
public void setConcurrency(int concurrency) {
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}

Expand Down