Skip to content

Commit d9a0474

Browse files
committed
GH-3685: Share MQTT connection across components
Fixes #3685 Add a new interface `ClientManager` which will manage clients and connections. Add different implementations for v3 and v5 MQTT clients. Use this manager in v3/v5 topic adapters and message handlers.
1 parent 3251fd2 commit d9a0474

File tree

11 files changed

+460
-161
lines changed

11 files changed

+460
-161
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import org.springframework.util.Assert;
20+
21+
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> {
22+
23+
private boolean manualAcks;
24+
25+
private String url;
26+
27+
private String clientId;
28+
29+
AbstractMqttClientManager(String url, String clientId) {
30+
Assert.notNull(clientId, "'clientId' is required");
31+
this.clientId = clientId;
32+
this.url = url;
33+
}
34+
35+
@Override
36+
public boolean isManualAcks() {
37+
return this.manualAcks;
38+
}
39+
40+
public void setManualAcks(boolean manualAcks) {
41+
this.manualAcks = manualAcks;
42+
}
43+
44+
public String getUrl() {
45+
return this.url;
46+
}
47+
48+
public void setUrl(String url) {
49+
this.url = url;
50+
}
51+
52+
public String getClientId() {
53+
return this.clientId;
54+
}
55+
56+
public void setClientId(String clientId) {
57+
this.clientId = clientId;
58+
}
59+
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/ClientManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package org.springframework.integration.mqtt.core;
1818

19-
import org.springframework.integration.support.management.ManageableLifecycle;
19+
import org.springframework.context.SmartLifecycle;
2020

21-
public interface ClientManager<T> extends ManageableLifecycle {
21+
public interface ClientManager<T> extends SmartLifecycle {
2222

2323
T getClient();
2424

25+
boolean isManualAcks();
26+
2527
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,128 +18,153 @@
1818

1919
import java.time.Instant;
2020
import java.util.concurrent.ScheduledFuture;
21-
import java.util.concurrent.atomic.AtomicReference;
2221

22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2324
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
2425
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
25-
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
2626
import org.eclipse.paho.client.mqttv3.MqttCallback;
2727
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2828
import org.eclipse.paho.client.mqttv3.MqttException;
2929
import org.eclipse.paho.client.mqttv3.MqttMessage;
3030

31-
import org.springframework.integration.context.IntegrationObjectSupport;
32-
import org.springframework.integration.support.management.ManageableLifecycle;
31+
import org.springframework.scheduling.TaskScheduler;
32+
import org.springframework.util.Assert;
3333

34-
public class Mqttv3ClientManager extends IntegrationObjectSupport implements ClientManager<IMqttAsyncClient>,
35-
ManageableLifecycle, MqttCallback {
34+
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {
3635

37-
private AtomicReference<ScheduledFuture<?>> scheduledReconnect;
36+
/**
37+
* The default reconnect timeout in millis.
38+
*/
39+
private static final long DEFAULT_RECOVERY_INTERVAL = 10_000;
3840

39-
private final MqttConnectOptions connectOptions;
41+
private final Log logger = LogFactory.getLog(this.getClass());
4042

41-
private final String clientId;
43+
private final MqttPahoClientFactory clientFactory;
4244

43-
private IMqttAsyncClient client;
45+
private final TaskScheduler taskScheduler;
4446

45-
public Mqttv3ClientManager(MqttConnectOptions connectOptions, String clientId) throws MqttException {
46-
this.connectOptions = connectOptions;
47-
this.client = new MqttAsyncClient(connectOptions.getServerURIs()[0], clientId);
48-
this.client.setCallback(this);
49-
this.clientId = clientId;
47+
private volatile ScheduledFuture<?> scheduledReconnect;
48+
49+
private volatile IMqttAsyncClient client;
50+
51+
private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;
52+
53+
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, TaskScheduler taskScheduler, String url,
54+
String clientId) {
55+
56+
super(url, clientId);
57+
Assert.notNull(clientId, "'clientFactory' is required");
58+
Assert.notNull(clientId, "'taskScheduler' is required");
59+
if (url == null) {
60+
Assert.notEmpty(clientFactory.getConnectionOptions().getServerURIs(), "'serverURIs' must be provided in the 'MqttConnectionOptions'");
61+
}
62+
this.clientFactory = clientFactory;
63+
this.taskScheduler = taskScheduler;
5064
}
5165

5266
@Override
5367
public IMqttAsyncClient getClient() {
54-
return client;
68+
return this.client;
5569
}
5670

5771
@Override
58-
public void start() {
72+
public synchronized void start() {
5973
if (this.client == null) {
6074
try {
61-
this.client = new MqttAsyncClient(this.connectOptions.getServerURIs()[0], this.clientId);
75+
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
76+
this.client.setManualAcks(isManualAcks());
77+
this.client.setCallback(this);
6278
}
6379
catch (MqttException e) {
6480
throw new IllegalStateException("could not start client manager", e);
6581
}
66-
this.client.setCallback(this);
6782
}
6883
try {
6984
connect();
7085
}
7186
catch (MqttException e) {
72-
logger.error(e, "could not start client manager, scheduling reconnect, client_id=" +
73-
this.client.getClientId());
87+
this.logger.error("could not start client manager, scheduling reconnect, client_id=" +
88+
this.client.getClientId(), e);
7489
scheduleReconnect();
7590
}
7691
}
7792

7893
@Override
79-
public void stop() {
94+
public synchronized void stop() {
8095
if (this.client == null) {
8196
return;
8297
}
8398
try {
84-
this.client.disconnectForcibly(this.connectOptions.getConnectionTimeout());
99+
this.client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
85100
}
86101
catch (MqttException e) {
87-
logger.error(e, "could not disconnect from the client");
102+
this.logger.error("could not disconnect from the client", e);
88103
}
89104
finally {
90105
try {
91106
this.client.close();
92107
}
93108
catch (MqttException e) {
94-
logger.error(e, "could not close the client");
109+
this.logger.error("could not close the client", e);
95110
}
96111
this.client = null;
97112
}
98113
}
99114

100115
@Override
101-
public boolean isRunning() {
116+
public synchronized boolean isRunning() {
102117
return this.client != null;
103118
}
104119

105-
private synchronized void connect() throws MqttException {
106-
if (this.client == null) {
107-
logger.error("could not connect on a null client reference");
108-
return;
109-
}
110-
MqttConnectOptions options = Mqttv3ClientManager.this.connectOptions;
111-
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
120+
@Override
121+
public synchronized void connectionLost(Throwable cause) {
122+
this.logger.error("connection lost, scheduling reconnect, client_id=" + this.client.getClientId(),
123+
cause);
124+
scheduleReconnect(); // todo: do we need to resubscribe if con lost?
112125
}
113126

114127
@Override
115-
public synchronized void connectionLost(Throwable cause) {
116-
logger.error(cause, "connection lost, scheduling reconnect, client_id=" + this.client.getClientId());
117-
scheduleReconnect();
128+
public void messageArrived(String topic, MqttMessage message) {
129+
// not this manager concern
130+
}
131+
132+
@Override
133+
public void deliveryComplete(IMqttDeliveryToken token) {
134+
// nor this manager concern
135+
}
136+
137+
public long getRecoveryInterval() {
138+
return this.recoveryInterval;
139+
}
140+
141+
public void setRecoveryInterval(long recoveryInterval) {
142+
this.recoveryInterval = recoveryInterval;
118143
}
119144

120-
private void scheduleReconnect() {
121-
if (this.scheduledReconnect.get() != null) {
122-
this.scheduledReconnect.get().cancel(false);
145+
private synchronized void connect() throws MqttException {
146+
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
147+
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
148+
}
149+
150+
private synchronized void scheduleReconnect() {
151+
if (this.scheduledReconnect != null) {
152+
this.scheduledReconnect.cancel(false);
123153
}
124-
this.scheduledReconnect.set(getTaskScheduler().schedule(() -> {
154+
this.scheduledReconnect = this.taskScheduler.schedule(() -> {
125155
try {
156+
if (this.client.isConnected()) {
157+
return;
158+
}
159+
126160
connect();
127-
this.scheduledReconnect.set(null);
161+
this.scheduledReconnect = null;
128162
}
129163
catch (MqttException e) {
130-
logger.error(e, "could not reconnect");
164+
this.logger.error("could not reconnect", e);
131165
scheduleReconnect();
132166
}
133-
}, Instant.now().plusSeconds(10)));
167+
}, Instant.now().plusMillis(getRecoveryInterval()));
134168
}
135169

136-
@Override
137-
public void messageArrived(String topic, MqttMessage message) {
138-
// not this manager concern
139-
}
140-
141-
@Override
142-
public void deliveryComplete(IMqttDeliveryToken token) {
143-
// nor this manager concern
144-
}
145170
}

0 commit comments

Comments
 (0)