Skip to content

MQTTv5: Mqttv5PahoMessageDrivenChannelAdapter unsubscribes from topics even when cleanStart/cleanSession is false #3955

@mths1

Description

@mths1

In what version(s) of Spring Integration are you seeing this issue?

sprint-boot 2.7.5
spring-integration-mqtt 5.5.15
org.eclipse.paho.mqttv5.client 1.2.5

Describe the bug
Mqttv5PahoMessageDrivenChannelAdapter unsubscribes from all topics, even if cleanStart/cleanSession is set to false, thus not receiving offline messages after restart. Similar behaviour to https://jira.spring.io/browse/INT-3900 but for MQTT v5.

To Reproduce

  • Create a Mqttv5PahoMessageDrivenChannelAdapter with setCleanStart(false) and setSessionExpiryInterval with e.g. 5000secs
  • Subscribe to some topics
  • Stop application
  • On the broker you will see that it unsubscribes from all topics

Expected behavior

The Mqttv5PahoMessageDrivenChannelAdapter does not unsubscribe in inStop.
Mqttv5PahoMessageDrivenChannelAdapter.java:

protected void doStop() {
		this.topicLock.lock();
		String[] topics = getTopic();
		try {
			if (this.mqttClient != null && this.mqttClient.isConnected()) {
				this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());

				if (getClientManager() == null) {
					this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
				}
			}
		}
		catch (MqttException ex) {
			logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
		}
		finally {
			this.topicLock.unlock();
		}
	}

MqttPahoMessageDrivenChannelAdapter (v3) does a check:

	protected synchronized void doStop() {
		try {
			if (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
					|| (this.consumerStopAction.equals(ConsumerStopAction.UNSUBSCRIBE_CLEAN)
					&& this.cleanSession)) {

				this.client.unsubscribe(getTopic());
			}
		}

Proposed fix:

    @Override
    protected void doStop() {
        this.topicLock.lock();
        String[] topics = getTopic();
        try {
            if (this.mqttClient != null && this.mqttClient.isConnected()) {
                /** FIX */
                if (connectionOptions.isCleanStart()) {
                    this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
                }
                /** FIX */
                this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
            }
        }
        catch (MqttException ex) {
            logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
        }
        finally {
            this.topicLock.unlock();
        }
    }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions