Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ configure(javaProjects) { subproject ->
testImplementation 'org.jetbrains.kotlin:kotlin-reflect'
testImplementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
testImplementation 'io.projectreactor:reactor-test'
testImplementation 'org.testcontainers:junit-jupiter:1.16.0'

testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
Expand Down Expand Up @@ -680,6 +681,7 @@ project('spring-integration-mqtt') {
dependencies {
api project(':spring-integration-core')
api "org.eclipse.paho:org.eclipse.paho.client.mqttv3:$pahoMqttClientVersion"
optionalApi "org.eclipse.paho:org.eclipse.paho.mqttv5.client:$pahoMqttClientVersion"

testImplementation project(':spring-integration-jmx')
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.event;

import org.eclipse.paho.mqttv5.common.MqttException;

/**
* The even representing an MQTT error occured during client interaction.
*
* @author Artem Bilan
*
* @since 5.5.5
*
* @see org.eclipse.paho.mqttv5.client.MqttCallback#mqttErrorOccurred(MqttException)
*/
@SuppressWarnings("serial")
public class MqttProtocolErrorEvent extends MqttIntegrationEvent {

public MqttProtocolErrorEvent(Object source, MqttException exception) {
super(source, exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogMessage;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttMessageConverter;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.jmx.export.annotation.ManagedAttribute;
Expand All @@ -45,15 +46,27 @@
*/
@ManagedResource
@IntegrationManagedResource
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport {
public abstract class AbstractMqttMessageDrivenChannelAdapter extends MessageProducerSupport
implements ApplicationEventPublisherAware {

/**
* The default completion timeout in milliseconds.
*/
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

private final String url;

private final String clientId;

private final Set<Topic> topics;

private volatile MqttMessageConverter converter;
private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private MqttMessageConverter converter;

protected final Lock topicLock = new ReentrantLock(); // NOSONAR

Expand Down Expand Up @@ -147,6 +160,42 @@ public String getComponentType() {
return "mqtt:inbound-channel-adapter";
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization)
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

/**
* Set the acknowledgment mode to manual.
* @param manualAcks true for manual acks.
* @since 5.3
*/
public void setManualAcks(boolean manualAcks) {
this.manualAcks = manualAcks;
}

protected boolean isManualAcks() {
return this.manualAcks;
}

/**
* Set the completion timeout for operations. Not settable using the namespace.
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 4.1
*/
public void setCompletionTimeout(long completionTimeout) {
this.completionTimeout = completionTimeout;
}

protected long getCompletionTimeout() {
return this.completionTimeout;
}

/**
* Add a topic to the subscribed list.
* @param topic The topic.
Expand Down Expand Up @@ -239,18 +288,6 @@ public void removeTopic(String... topic) {
}
}

@Override
protected void onInit() {
super.onInit();
if (this.converter == null) {
DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
pahoMessageConverter.setBeanFactory(getBeanFactory());
this.converter = pahoMessageConverter;

}
}


/**
* @since 4.1
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.acks.SimpleAcknowledgment;
import org.springframework.integration.mqtt.core.ConsumerStopAction;
Expand All @@ -38,6 +37,7 @@
import org.springframework.integration.mqtt.core.MqttPahoComponent;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttUtils;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.messaging.Message;
Expand All @@ -60,12 +60,7 @@
*
*/
public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDrivenChannelAdapter
implements MqttCallback, MqttPahoComponent, ApplicationEventPublisherAware {

/**
* The default completion timeout in milliseconds.
*/
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;
implements MqttCallback, MqttPahoComponent {

/**
* The default disconnect completion timeout in milliseconds.
Expand All @@ -78,14 +73,8 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv

private int recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

private long completionTimeout = DEFAULT_COMPLETION_TIMEOUT;

private long disconnectCompletionTimeout = DISCONNECT_COMPLETION_TIMEOUT;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private volatile IMqttClient client;

private volatile ScheduledFuture<?> reconnectFuture;
Expand Down Expand Up @@ -139,16 +128,6 @@ public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String..
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
}

/**
* Set the completion timeout for operations. Not settable using the namespace.
* Default {@value #DEFAULT_COMPLETION_TIMEOUT} milliseconds.
* @param completionTimeout The timeout.
* @since 4.1
*/
public synchronized void setCompletionTimeout(long completionTimeout) {
this.completionTimeout = completionTimeout;
}

/**
* Set the completion timeout when disconnecting. Not settable using the namespace.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
Expand All @@ -169,23 +148,6 @@ public synchronized void setRecoveryInterval(int recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}

/**
* Set the acknowledgment mode to manual.
* @param manualAcks true for manual acks.
* @since 5.3
*/
public void setManualAcks(boolean manualAcks) {
this.manualAcks = manualAcks;
}

/**
* @since 4.2.2
*/
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher; // NOSONAR (inconsistent synchronization)
}

@Override
public MqttConnectOptions getConnectionInfo() {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
Expand All @@ -199,6 +161,17 @@ public MqttConnectOptions getConnectionInfo() {
return options;
}

@Override
protected void onInit() {
super.onInit();
if (getConverter() == null) {
DefaultPahoMessageConverter pahoMessageConverter = new DefaultPahoMessageConverter();
pahoMessageConverter.setBeanFactory(getBeanFactory());
setConverter(pahoMessageConverter);

}
}

@Override
protected void doStart() {
Assert.state(getTaskScheduler() != null, "A 'taskScheduler' is required");
Expand Down Expand Up @@ -293,22 +266,26 @@ private synchronized void connectAndSubscribe() throws MqttException {
this.client = this.clientFactory.getClientInstance(getUrl(), getClientId());
this.client.setCallback(this);
if (this.client instanceof MqttClient) {
((MqttClient) this.client).setTimeToWait(this.completionTimeout);
((MqttClient) this.client).setTimeToWait(getCompletionTimeout());
}

this.topicLock.lock();
String[] topics = getTopic();
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
try {
this.client.connect(connectionOptions);
this.client.setManualAcks(this.manualAcks);
int[] requestedQos = getQos();
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
this.client.subscribe(topics, grantedQos);
warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
this.client.setManualAcks(isManualAcks());
if (topics.length > 0) {
int[] requestedQos = getQos();
int[] grantedQos = Arrays.copyOf(requestedQos, requestedQos.length);
this.client.subscribe(topics, grantedQos);
warnInvalidQosForSubscription(topics, requestedQos, grantedQos);
}
}
catch (MqttException ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));

if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
}
logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
if (this.client != null) { // Could be reset during event handling before
Expand All @@ -331,8 +308,8 @@ private synchronized void connectAndSubscribe() throws MqttException {
this.connected = true;
String message = "Connected and subscribed to " + Arrays.toString(topics);
logger.debug(message);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
}
}
}
Expand Down Expand Up @@ -397,8 +374,9 @@ public synchronized void connectionLost(Throwable cause) {
}
this.client = null;
scheduleReconnect();
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, cause));
}
}
}
Expand All @@ -407,7 +385,7 @@ public synchronized void connectionLost(Throwable cause) {
public void messageArrived(String topic, MqttMessage mqttMessage) {
AbstractIntegrationMessageBuilder<?> builder = toMessageBuilder(topic, mqttMessage);
if (builder != null) {
if (this.manualAcks) {
if (isManualAcks()) {
builder.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK,
new AcknowledgmentImpl(mqttMessage.getId(), mqttMessage.getQos(), this.client));
}
Expand Down Expand Up @@ -458,7 +436,7 @@ public void deliveryComplete(IMqttDeliveryToken token) {
}

/**
* Used to complete message arrival when {@link #manualAcks} is true.
* Used to complete message arrival when {@link #isManualAcks()} is true.
*
* @since 5.3
*/
Expand Down
Loading