Skip to content

Commit 9f47cc8

Browse files
committed
GH-3685: Share MQTT connection across components
Fixes #3685 Add a couple of unit/integration tests to cover client manager usage. Several small code improvements after the code review: * Improve client manager usage via providing several mutual exclusive constructors, whether the users provides `url` or `connectionOptions` or `clientFactory` for v3. * Move the logger to `AbstractMqttClientManager` * Do not inject TaskScheduler in constructor for v3 client manager but use lazy init via `BeanFactory` and `IntegrationContextUtils` * Other smaller code readability improvements
1 parent d9a0474 commit 9f47cc8

File tree

6 files changed

+303
-50
lines changed

6 files changed

+303
-50
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/AbstractMqttClientManager.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@
1616

1717
package org.springframework.integration.mqtt.core;
1818

19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
1922
import org.springframework.util.Assert;
2023

2124
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> {
2225

26+
protected final Log logger = LogFactory.getLog(this.getClass());
27+
2328
private boolean manualAcks;
2429

2530
private String url;
2631

27-
private String clientId;
32+
private final String clientId;
2833

29-
AbstractMqttClientManager(String url, String clientId) {
34+
AbstractMqttClientManager(String clientId) {
3035
Assert.notNull(clientId, "'clientId' is required");
3136
this.clientId = clientId;
32-
this.url = url;
3337
}
3438

3539
@Override
@@ -53,7 +57,4 @@ public String getClientId() {
5357
return this.clientId;
5458
}
5559

56-
public void setClientId(String clientId) {
57-
this.clientId = clientId;
58-
}
5960
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,55 +19,76 @@
1919
import java.time.Instant;
2020
import java.util.concurrent.ScheduledFuture;
2121

22-
import org.apache.commons.logging.Log;
23-
import org.apache.commons.logging.LogFactory;
2422
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2523
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
2624
import org.eclipse.paho.client.mqttv3.MqttCallback;
2725
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2826
import org.eclipse.paho.client.mqttv3.MqttException;
2927
import org.eclipse.paho.client.mqttv3.MqttMessage;
3028

29+
import org.springframework.beans.factory.BeanFactory;
30+
import org.springframework.beans.factory.BeanFactoryAware;
31+
import org.springframework.beans.factory.InitializingBean;
32+
import org.springframework.integration.context.IntegrationContextUtils;
3133
import org.springframework.scheduling.TaskScheduler;
3234
import org.springframework.util.Assert;
3335

34-
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
36+
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient>
37+
implements MqttCallback, InitializingBean, BeanFactoryAware {
3538

3639
/**
3740
* The default reconnect timeout in millis.
3841
*/
3942
private static final long DEFAULT_RECOVERY_INTERVAL = 10_000;
4043

41-
private final Log logger = LogFactory.getLog(this.getClass());
42-
4344
private final MqttPahoClientFactory clientFactory;
4445

45-
private final TaskScheduler taskScheduler;
46+
private BeanFactory beanFactory;
47+
48+
private TaskScheduler taskScheduler;
4649

4750
private volatile ScheduledFuture<?> scheduledReconnect;
4851

4952
private volatile IMqttAsyncClient client;
5053

5154
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
5255

53-
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, TaskScheduler taskScheduler, String url,
54-
String clientId) {
55-
56-
super(url, clientId);
57-
Assert.notNull(clientId, "'clientFactory' is required");
58-
Assert.notNull(clientId, "'taskScheduler' is required");
59-
if (url == null) {
60-
Assert.notEmpty(clientFactory.getConnectionOptions().getServerURIs(), "'serverURIs' must be provided in the 'MqttConnectionOptions'");
61-
}
56+
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
57+
super(clientId);
58+
Assert.notNull(clientFactory, "'clientFactory' is required");
6259
this.clientFactory = clientFactory;
63-
this.taskScheduler = taskScheduler;
60+
String[] serverURIs = clientFactory.getConnectionOptions().getServerURIs();
61+
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
62+
setUrl(serverURIs[0]);
63+
}
64+
65+
public Mqttv3ClientManager(String url, String clientId) {
66+
super(clientId);
67+
Assert.notNull(url, "'url' is required");
68+
setUrl(url);
69+
MqttConnectOptions connectOptions = new MqttConnectOptions();
70+
connectOptions.setServerURIs(new String[]{ url });
71+
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory();
72+
defaultFactory.setConnectionOptions(connectOptions);
73+
this.clientFactory = defaultFactory;
6474
}
6575

6676
@Override
6777
public IMqttAsyncClient getClient() {
6878
return this.client;
6979
}
7080

81+
@Override
82+
public void afterPropertiesSet() {
83+
this.taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
84+
}
85+
86+
@Override
87+
public void setBeanFactory(BeanFactory beanFactory) {
88+
Assert.notNull(beanFactory, "'beanFactory' must not be null");
89+
this.beanFactory = beanFactory;
90+
}
91+
7192
@Override
7293
public synchronized void start() {
7394
if (this.client == null) {
@@ -84,7 +105,7 @@ public synchronized void start() {
84105
connect();
85106
}
86107
catch (MqttException e) {
87-
this.logger.error("could not start client manager, scheduling reconnect, client_id=" +
108+
logger.error("could not start client manager, scheduling reconnect, client_id=" +
88109
this.client.getClientId(), e);
89110
scheduleReconnect();
90111
}
@@ -99,14 +120,14 @@ public synchronized void stop() {
99120
this.client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
100121
}
101122
catch (MqttException e) {
102-
this.logger.error("could not disconnect from the client", e);
123+
logger.error("could not disconnect from the client", e);
103124
}
104125
finally {
105126
try {
106127
this.client.close();
107128
}
108129
catch (MqttException e) {
109-
this.logger.error("could not close the client", e);
130+
logger.error("could not close the client", e);
110131
}
111132
this.client = null;
112133
}
@@ -119,9 +140,9 @@ public synchronized boolean isRunning() {
119140

120141
@Override
121142
public synchronized void connectionLost(Throwable cause) {
122-
this.logger.error("connection lost, scheduling reconnect, client_id=" + this.client.getClientId(),
143+
logger.error("connection lost, scheduling reconnect, client_id=" + this.client.getClientId(),
123144
cause);
124-
scheduleReconnect(); // todo: do we need to resubscribe if con lost?
145+
scheduleReconnect();
125146
}
126147

127148
@Override
@@ -161,7 +182,7 @@ private synchronized void scheduleReconnect() {
161182
this.scheduledReconnect = null;
162183
}
163184
catch (MqttException e) {
164-
this.logger.error("could not reconnect", e);
185+
logger.error("could not reconnect", e);
165186
scheduleReconnect();
166187
}
167188
}, Instant.now().plusMillis(getRecoveryInterval()));

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.integration.mqtt.core;
1818

19-
import org.apache.commons.logging.Log;
20-
import org.apache.commons.logging.LogFactory;
2119
import org.eclipse.paho.mqttv5.client.IMqttAsyncClient;
2220
import org.eclipse.paho.mqttv5.client.IMqttToken;
2321
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
@@ -32,19 +30,29 @@
3230

3331
public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
3432

35-
private final Log logger = LogFactory.getLog(this.getClass());
36-
3733
private final MqttConnectionOptions connectionOptions;
3834

3935
private volatile IMqttAsyncClient client;
4036

41-
public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String url, String clientId) {
42-
super(url, clientId);
37+
public Mqttv5ClientManager(String url, String clientId) {
38+
super(clientId);
39+
Assert.notNull(url, "'url' is required");
40+
setUrl(url);
41+
this.connectionOptions = new MqttConnectionOptions();
42+
this.connectionOptions.setServerURIs(new String[]{ url });
43+
this.connectionOptions.setAutomaticReconnect(true);
44+
}
45+
46+
public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clientId) {
47+
super(clientId);
4348
Assert.notNull(connectionOptions, "'connectionOptions' is required");
44-
if (url == null) {
45-
Assert.notEmpty(connectionOptions.getServerURIs(), "'serverURIs' must be provided in the 'MqttConnectionOptions'");
46-
}
4749
this.connectionOptions = connectionOptions;
50+
if (!this.connectionOptions.isAutomaticReconnect()) {
51+
logger.warn("It is recommended to set 'automaticReconnect' MQTT connection option. " +
52+
"Otherwise connection check and reconnect should be done manually.");
53+
}
54+
Assert.notEmpty(connectionOptions.getServerURIs(), "'serverURIs' must be provided in the 'MqttConnectionOptions'");
55+
setUrl(connectionOptions.getServerURIs()[0]);
4856
}
4957

5058
@Override
@@ -69,7 +77,7 @@ public synchronized void start() {
6977
.waitForCompletion(this.connectionOptions.getConnectionTimeout());
7078
}
7179
catch (MqttException e) {
72-
this.logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
80+
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);
7381
}
7482
}
7583

@@ -83,14 +91,14 @@ public synchronized void stop() {
8391
this.client.disconnectForcibly(this.connectionOptions.getConnectionTimeout());
8492
}
8593
catch (MqttException e) {
86-
this.logger.error("could not disconnect from the client", e);
94+
logger.error("could not disconnect from the client", e);
8795
}
8896
finally {
8997
try {
9098
this.client.close();
9199
}
92100
catch (MqttException e) {
93-
this.logger.error("could not close the client", e);
101+
logger.error("could not close the client", e);
94102
}
95103
this.client = null;
96104
}
@@ -113,8 +121,8 @@ public void deliveryComplete(IMqttToken token) {
113121

114122
@Override
115123
public void connectComplete(boolean reconnect, String serverURI) {
116-
if (this.logger.isInfoEnabled()) {
117-
this.logger.info("MQTT connect complete to " + serverURI);
124+
if (logger.isInfoEnabled()) {
125+
logger.info("MQTT connect complete to " + serverURI);
118126
}
119127
// probably makes sense to use custom callbacks in the future
120128
}
@@ -126,14 +134,14 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {
126134

127135
@Override
128136
public void disconnected(MqttDisconnectResponse disconnectResponse) {
129-
if (this.logger.isInfoEnabled()) {
130-
this.logger.info("MQTT disconnected" + disconnectResponse);
137+
if (logger.isInfoEnabled()) {
138+
logger.info("MQTT disconnected" + disconnectResponse);
131139
}
132140
}
133141

134142
@Override
135143
public void mqttErrorOccurred(MqttException exception) {
136-
this.logger.error("MQTT error occurred", exception);
144+
logger.error("MQTT error occurred", exception);
137145
}
138146

139147
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,11 +331,9 @@ public void deliveryComplete(IMqttToken token) {
331331

332332
@Override
333333
public void connectComplete(boolean reconnect, String serverURI) {
334-
if (reconnect) {
335-
return;
334+
if (!reconnect) {
335+
subscribeToAll();
336336
}
337-
338-
subscribeToAll();
339337
}
340338

341339
@Override

0 commit comments

Comments
 (0)