Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.springframework.integration.json.ObjectToJsonTransformer;
import org.springframework.integration.mapping.support.JsonHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.context.TestApplicationContextAware;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.integration.transformer.Transformer;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -86,7 +87,7 @@
*
* @since 3.0
*/
public class InboundEndpointTests {
public class InboundEndpointTests implements TestApplicationContextAware {

@Test
public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
Expand Down Expand Up @@ -254,6 +255,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
}

});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
MessageProperties props = new MessageProperties();
Expand Down Expand Up @@ -310,6 +312,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
}

});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
MessageProperties props = new MessageProperties();
Expand Down Expand Up @@ -348,6 +351,7 @@ public void testRetryWithinOnMessageAdapter() throws Exception {
ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors);
recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
adapter.setRecoveryCallback(recoveryCallback);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
listener.onMessage(org.springframework.amqp.core.MessageBuilder.withBody("foo".getBytes())
Expand Down Expand Up @@ -380,6 +384,7 @@ public void testRetryWithMessageRecovererOnMessageAdapter() throws Exception {
recoveredError.set(cause);
recoveredLatch.countDown();
});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
org.springframework.amqp.core.Message amqpMessage =
Expand Down Expand Up @@ -409,6 +414,7 @@ public void testRetryWithinOnMessageGateway() throws Exception {
ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors);
recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
adapter.setRecoveryCallback(recoveryCallback);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
listener.onMessage(org.springframework.amqp.core.MessageBuilder.withBody("foo".getBytes())
Expand Down Expand Up @@ -441,6 +447,7 @@ public void testRetryWithMessageRecovererOnMessageGateway() throws Exception {
recoveredError.set(cause);
recoveredLatch.countDown();
});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
org.springframework.amqp.core.Message amqpMessage =
Expand All @@ -467,6 +474,7 @@ public void testBatchAdapter() throws Exception {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
QueueChannel out = new QueueChannel();
adapter.setOutputChannel(out);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
Expand All @@ -493,6 +501,7 @@ public void testBatchGateway() throws Exception {
gateway.setRequestChannel(out);
gateway.setBindSourceMessage(true);
gateway.setReplyTimeout(0);
gateway.setBeanFactory(TEST_INTEGRATION_CONTEXT);
gateway.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
Expand Down Expand Up @@ -521,6 +530,7 @@ public void testConsumerBatchExtract() {
adapter.setOutputChannel(out);
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS);
adapter.setHeaderNameForBatchedHeaders("some_batch_headers");
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener();
MessageProperties messageProperties = new MessageProperties();
Expand All @@ -544,6 +554,7 @@ public void testConsumerBatch() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
QueueChannel out = new QueueChannel();
adapter.setOutputChannel(out);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener();
MessageProperties messageProperties = new MessageProperties();
Expand All @@ -564,6 +575,7 @@ public void testConsumerBatchAndWrongMessageRecoverer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock());
container.setConsumerBatchEnabled(true);
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.setRetryTemplate(new RetryTemplate());
adapter.setMessageRecoverer((message, cause) -> {
});
Expand All @@ -580,6 +592,7 @@ public void testExclusiveRecover() {
adapter.setMessageRecoverer((message, cause) -> {
});
adapter.setRecoveryCallback(context -> null);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
assertThatIllegalStateException()
.isThrownBy(adapter::afterPropertiesSet)
.withMessageStartingWith("Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, " +
Expand Down Expand Up @@ -609,6 +622,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws

});
adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Expand Down Expand Up @@ -665,6 +679,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
}

});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Expand Down Expand Up @@ -709,6 +724,7 @@ public void testRetryWithinOnMessageAdapterConsumerBatch() {
adapter.setRetryTemplate(new RetryTemplate());
QueueChannel errors = new QueueChannel();
ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors);
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
adapter.setRecoveryCallback(recoveryCallback);
adapter.afterPropertiesSet();
Expand Down Expand Up @@ -761,6 +777,7 @@ public void testRetryWithMessageRecovererOnMessageAdapterConsumerBatch() throws
recoveredError.set(cause);
recoveredLatch.countDown();
});
adapter.setBeanFactory(TEST_INTEGRATION_CONTEXT);
adapter.afterPropertiesSet();
ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener();
MessageProperties messageProperties = new MessageProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.mapping.support.JsonHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.context.TestApplicationContextAware;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -59,7 +60,7 @@
*
* @since 3.0
*/
public class OutboundEndpointTests {
public class OutboundEndpointTests implements TestApplicationContextAware {

@Test
public void testDelayExpression() {
Expand All @@ -75,6 +76,7 @@ public void testDelayExpression() {
endpoint.setRoutingKey("bar");
endpoint.setDelayExpressionString("42");
endpoint.setBeanFactory(mock(BeanFactory.class));
endpoint.setBeanFactory(TEST_INTEGRATION_CONTEXT);
endpoint.afterPropertiesSet();
endpoint.handleMessage(new GenericMessage<>("foo"));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
Expand Down Expand Up @@ -109,6 +111,7 @@ public void testAsyncDelayExpression() {
gateway.setDelayExpressionString("42");
gateway.setBeanFactory(mock(BeanFactory.class));
gateway.setOutputChannel(new NullChannel());
gateway.setBeanFactory(TEST_INTEGRATION_CONTEXT);
gateway.afterPropertiesSet();
gateway.start();
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* @author Artem Bilan
Expand All @@ -51,7 +52,9 @@ public class CamelDslTests {

@Test
void sendAndReceiveCamelRoute() {
String result = new MessagingTemplate().convertSendAndReceive(this.input, "apache camel", String.class);
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(mock());
String result = messagingTemplate.convertSendAndReceive(this.input, "apache camel", String.class);
assertThat(result).isEqualTo("___APACHE CAMEL___");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,25 @@
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.camel.support.CamelHeaderMapper;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.context.TestApplicationContextAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* @author Artem Bilan
*
* @since 6.0
*/
public class CamelMessageHandlerTests extends CamelTestSupport {
public class CamelMessageHandlerTests extends CamelTestSupport implements TestApplicationContextAware {

@Test
void inOnlyPatternSyncMessageHandler() throws InterruptedException {
Expand All @@ -60,7 +59,7 @@ void inOnlyPatternSyncMessageHandler() throws InterruptedException {

CamelMessageHandler camelMessageHandler = new CamelMessageHandler(template());
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setBeanFactory(mock(BeanFactory.class));
camelMessageHandler.setBeanFactory(TEST_INTEGRATION_CONTEXT);
camelMessageHandler.afterPropertiesSet();

camelMessageHandler.handleMessage(messageUnderTest);
Expand Down Expand Up @@ -97,7 +96,7 @@ void inOutPatternSyncMessageHandlerWithNoRequestHeadersButReplyHeaders() throws
camelMessageHandler.setEndpointUriExpression(new FunctionExpression<>(m -> "direct:simple"));
camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
camelMessageHandler.setHeaderMapper(headerMapper);
camelMessageHandler.setBeanFactory(mock(BeanFactory.class));
camelMessageHandler.setBeanFactory(TEST_INTEGRATION_CONTEXT);
camelMessageHandler.afterPropertiesSet();

camelMessageHandler.handleMessage(messageUnderTest);
Expand Down Expand Up @@ -128,7 +127,7 @@ void inOnlyPatternAsyncMessageHandlerWithException() throws InterruptedException

CamelMessageHandler camelMessageHandler = new CamelMessageHandler(template());
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setBeanFactory(mock(BeanFactory.class));
camelMessageHandler.setBeanFactory(TEST_INTEGRATION_CONTEXT);
camelMessageHandler.setAsync(true);
camelMessageHandler.afterPropertiesSet();

Expand Down Expand Up @@ -161,7 +160,7 @@ void inOutPatternAsyncMessageHandler() throws InterruptedException {
producerTemplate.setDefaultEndpointUri("direct:simple");
CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
camelMessageHandler.setExchangePattern(ExchangePattern.InOut);
camelMessageHandler.setBeanFactory(mock(BeanFactory.class));
camelMessageHandler.setBeanFactory(TEST_INTEGRATION_CONTEXT);
camelMessageHandler.setAsync(true);
camelMessageHandler.afterPropertiesSet();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Provides classes related to message acknowledgment.
*/
@org.springframework.lang.NonNullApi
@org.jspecify.annotations.NullMarked
package org.springframework.integration.acks;
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public abstract class AbstractAggregatingMessageGroupProcessor implements Messag

private boolean messageBuilderFactorySet;

@SuppressWarnings("NullAway.Init")
private BeanFactory beanFactory;

@Override
Expand Down
Loading
Loading