Skip to content

Commit 7d02658

Browse files
committed
GH-4007: Remove MQTT ConsumerStopAction
Fixes #4007 It was deprecated in the previous version and fully covered with existing `cleanSession` connection option.
1 parent 53d1ecd commit 7d02658

File tree

4 files changed

+1
-102
lines changed

4 files changed

+1
-102
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ConsumerStopAction.java

Lines changed: 0 additions & 48 deletions
This file was deleted.

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/DefaultMqttPahoClientFactory.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ public class DefaultMqttPahoClientFactory implements MqttPahoClientFactory {
4141

4242
private MqttClientPersistence persistence;
4343

44-
@SuppressWarnings("deprecation")
45-
private ConsumerStopAction consumerStopAction = ConsumerStopAction.UNSUBSCRIBE_CLEAN;
46-
4744
/**
4845
* Set the persistence to pass into the client constructor.
4946
* @param persistence the persistence to set.
@@ -52,32 +49,6 @@ public void setPersistence(MqttClientPersistence persistence) {
5249
this.persistence = persistence;
5350
}
5451

55-
/**
56-
* Get the consumer stop action.
57-
* @return the consumer stop action.
58-
* @since 4.2.3
59-
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
60-
* Will be removed in 6.1.0.
61-
*/
62-
@Deprecated
63-
@Override
64-
public ConsumerStopAction getConsumerStopAction() {
65-
return this.consumerStopAction;
66-
}
67-
68-
/**
69-
* Set the consumer stop action. Determines whether we unsubscribe when the consumer stops.
70-
* Default: {@link ConsumerStopAction#UNSUBSCRIBE_CLEAN}.
71-
* @param consumerStopAction the consumer stop action.
72-
* @since 4.2.3.
73-
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
74-
* Will be removed in 6.1.0.
75-
*/
76-
@Deprecated
77-
public void setConsumerStopAction(ConsumerStopAction consumerStopAction) {
78-
this.consumerStopAction = consumerStopAction;
79-
}
80-
8152
@Override
8253
public IMqttClient getClientInstance(String uri, String clientId) throws MqttException {
8354
// Client validates URI even if overridden by options

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/MqttPahoClientFactory.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,4 @@ public interface MqttPahoClientFactory {
5656
*/
5757
MqttConnectOptions getConnectionOptions();
5858

59-
/**
60-
* Get the consumer stop action.
61-
* @return the consumer stop action.
62-
* @since 4.3
63-
* @deprecated since 5.5.17 in favor of standard {@link MqttConnectOptions#setCleanSession(boolean)}.
64-
* Will be removed in 6.1.0.
65-
*/
66-
@Deprecated
67-
ConsumerStopAction getConsumerStopAction();
68-
6959
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ public class MqttPahoMessageDrivenChannelAdapter
6868

6969
private volatile IMqttAsyncClient client;
7070

71-
@SuppressWarnings("deprecation")
72-
private volatile org.springframework.integration.mqtt.core.ConsumerStopAction consumerStopAction;
73-
7471
private volatile boolean readyToSubscribeOnStart;
7572

7673
/**
@@ -184,11 +181,6 @@ protected void doStart() {
184181
@SuppressWarnings("deprecation")
185182
private synchronized void connect() throws MqttException {
186183
MqttConnectOptions connectionOptions = this.clientFactory.getConnectionOptions();
187-
this.consumerStopAction = this.clientFactory.getConsumerStopAction();
188-
if (this.consumerStopAction == null) {
189-
this.consumerStopAction = org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN;
190-
}
191-
192184
var clientManager = getClientManager();
193185
if (clientManager == null) {
194186
Assert.state(getUrl() != null || connectionOptions.getServerURIs() != null,
@@ -203,17 +195,11 @@ private synchronized void connect() throws MqttException {
203195
}
204196
}
205197

206-
@SuppressWarnings("deprecation")
207198
@Override
208199
protected synchronized void doStop() {
209200
this.readyToSubscribeOnStart = false;
210201
try {
211-
if (this.consumerStopAction
212-
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_ALWAYS)
213-
|| (this.consumerStopAction
214-
.equals(org.springframework.integration.mqtt.core.ConsumerStopAction.UNSUBSCRIBE_CLEAN)
215-
&& this.clientFactory.getConnectionOptions().isCleanSession())) {
216-
202+
if (this.clientFactory.getConnectionOptions().isCleanSession()) {
217203
this.client.unsubscribe(getTopic());
218204
// Have to re-subscribe on next start if connection is not lost.
219205
this.readyToSubscribeOnStart = true;

0 commit comments

Comments
 (0)