Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,4 +48,12 @@
@Documented
public @interface KafkaHandler {

/**
* When true, designate that this is the default fallback method if the payload type
* matches no other {@link KafkaHandler} method. Only one method can be so designated.
* @return true if this is the default method.
* @since 2.1.3
*/
boolean isDefault() default false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,20 @@ private Set<KafkaListener> findListenerAnnotations(Method method) {
private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
Object bean, String beanName) {
List<Method> checkedMethods = new ArrayList<Method>();
Method defaultMethod = null;
for (Method method : multiMethods) {
checkedMethods.add(checkProxy(method, bean));
Method checked = checkProxy(method, bean);
if (AnnotationUtils.findAnnotation(method, KafkaHandler.class).isDefault()) {
final Method toAssert = defaultMethod;
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
+ toAssert.toString() + " and " + method.toString());
defaultMethod = checked;
}
checkedMethods.add(checked);
}
for (KafkaListener classLevelListener : classLevelListeners) {
MultiMethodKafkaListenerEndpoint<K, V> endpoint = new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods,
bean);
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods, defaultMethod, bean);
endpoint.setBeanFactory(this.beanFactory);
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
}
Expand Down Expand Up @@ -680,7 +688,7 @@ private <T> Collection<T> getBeansOfType(Class<T> type) {
*/
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {

private DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();

private MessageHandlerMethodFactory messageHandlerMethodFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import org.springframework.kafka.listener.adapter.DelegatingInvocableHandler;
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.lang.Nullable;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;

/**
Expand All @@ -41,20 +42,44 @@ public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerE

private final List<Method> methods;

private final Method defaultMethod;

/**
* Construct an instance for the provided methods and bean with no default method.
* @param methods the methods.
* @param bean the bean.
*/
public MultiMethodKafkaListenerEndpoint(List<Method> methods, Object bean) {
this(methods, null, bean);
}

/**
* Construct an instance for the provided methods, default method and bean.
* @param methods the methods.
* @param defaultMethod the default method.
* @param bean the bean.
* @since 2.1.3
*/
public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method defaultMethod, Object bean) {
this.methods = methods;
this.defaultMethod = defaultMethod;
setBean(bean);
}

@Override
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
InvocableHandlerMethod defaultHandler = null;
for (Method method : this.methods) {
invocableHandlerMethods.add(getMessageHandlerMethodFactory()
.createInvocableHandlerMethod(getBean(), method));
InvocableHandlerMethod handler = getMessageHandlerMethodFactory()
.createInvocableHandlerMethod(getBean(), method);
invocableHandlerMethods.add(handler);
if (method.equals(this.defaultMethod)) {
defaultHandler = handler;
}
}
DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
getBean(), getResolver(), getBeanExpressionContext());
defaultHandler, getBean(), getResolver(), getBeanExpressionContext());
return new HandlerAdapter(delegatingHandler);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
Expand All @@ -61,6 +62,8 @@ public class DelegatingInvocableHandler {

private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();

private final InvocableHandlerMethod defaultHandler;

private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new HashMap<>();

private final Object bean;
Expand All @@ -78,7 +81,23 @@ public class DelegatingInvocableHandler {
*/
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object bean,
BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext);
}

/**
* Construct an instance with the supplied handlers for the bean.
* @param handlers the handlers.
* @param defaultHandler the default handler.
* @param bean the bean.
* @param beanExpressionResolver the resolver.
* @param beanExpressionContext the context.
* @since 2.1.3
*/
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
@Nullable InvocableHandlerMethod defaultHandler,
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
this.handlers = new ArrayList<>(handlers);
this.defaultHandler = defaultHandler;
this.bean = bean;
this.resolver = beanExpressionResolver;
this.beanExpressionContext = beanExpressionContext;
Expand Down Expand Up @@ -174,13 +193,19 @@ protected InvocableHandlerMethod findHandlerForPayload(Class<? extends Object> p
for (InvocableHandlerMethod handler : this.handlers) {
if (matchHandlerMethod(payloadClass, handler)) {
if (result != null) {
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
result.getMethod().getName() + " and " + handler.getMethod().getName());
boolean resultIsDefault = result.equals(this.defaultHandler);
if (!handler.equals(this.defaultHandler) && !resultIsDefault) {
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
result.getMethod().getName() + " and " + handler.getMethod().getName());
}
if (!resultIsDefault) {
continue; // otherwise replace the result with the actual match
}
}
result = handler;
}
}
return result;
return result != null ? result : this.defaultHandler;
}

protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, InvocableHandlerMethod handler) {
Expand Down Expand Up @@ -221,4 +246,8 @@ public String getMethodNameFor(Object payload) {
return handlerForPayload == null ? "no match" : handlerForPayload.getMethod().toGenericString(); //NOSONAR
}

public boolean hasDefaultHandler() {
return this.defaultHandler != null;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -47,6 +47,13 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs);
}
else if (this.delegatingHandler.hasDefaultHandler()) {
// Needed to avoid returning raw Message which matches Object
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
return this.delegatingHandler.invoke(message, args);
}
else {
return this.delegatingHandler.invoke(message, providedArgs);
}
Expand Down
Loading