-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Description
In what version(s) of Spring Integration are you seeing this issue?
sprint-boot 2.7.5
spring-integration-mqtt 5.5.15
org.eclipse.paho.mqttv5.client 1.2.5
Describe the bug
Mqttv5PahoMessageDrivenChannelAdapter unsubscribes from all topics, even if cleanStart/cleanSession is set to false, thus not receiving offline messages after restart. Similar behaviour to https://jira.spring.io/browse/INT-3900 but for MQTT v5.
To Reproduce
- Create a Mqttv5PahoMessageDrivenChannelAdapter with setCleanStart(false) and setSessionExpiryInterval with e.g. 5000secs
- Subscribe to some topics
- Stop application
- On the broker you will see that it unsubscribes from all topics
Expected behavior
The Mqttv5PahoMessageDrivenChannelAdapter does not unsubscribe in inStop.
Mqttv5PahoMessageDrivenChannelAdapter.java:
protected void doStop() {
this.topicLock.lock();
String[] topics = getTopic();
try {
if (this.mqttClient != null && this.mqttClient.isConnected()) {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
if (getClientManager() == null) {
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
}
}
}
catch (MqttException ex) {
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
}
finally {
this.topicLock.unlock();
}
}
MqttPahoMessageDrivenChannelAdapter (v3) does a check:
protected synchronized void doStop() {
try {
if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
&& this.cleanSession)) {
this.client.unsubscribe(getTopic());
}
}
Proposed fix:
@Override
protected void doStop() {
this.topicLock.lock();
String[] topics = getTopic();
try {
if (this.mqttClient != null && this.mqttClient.isConnected()) {
/** FIX */
if (connectionOptions.isCleanStart()) {
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
}
/** FIX */
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
}
}
catch (MqttException ex) {
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
}
finally {
this.topicLock.unlock();
}
}