-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-3685: Share MQTT connection across components #3857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
3251fd2
d9a0474
9f47cc8
d338577
4cc094f
f3fba7d
f595998
b5da36b
318fd82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import org.springframework.util.Assert; | ||
|
||
/** | ||
* @param <T> MQTT client type | ||
* | ||
* @author Artem Vozhdayenko | ||
* | ||
* @since 6.0 | ||
*/ | ||
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> { | ||
|
||
protected final Log logger = LogFactory.getLog(this.getClass()); | ||
oxcafedead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private boolean manualAcks; | ||
|
||
private String url; | ||
|
||
private final String clientId; | ||
|
||
AbstractMqttClientManager(String clientId) { | ||
Assert.notNull(clientId, "'clientId' is required"); | ||
this.clientId = clientId; | ||
} | ||
|
||
@Override | ||
public boolean isManualAcks() { | ||
return this.manualAcks; | ||
} | ||
|
||
public void setManualAcks(boolean manualAcks) { | ||
this.manualAcks = manualAcks; | ||
} | ||
|
||
public String getUrl() { | ||
return this.url; | ||
} | ||
|
||
public void setUrl(String url) { | ||
this.url = url; | ||
} | ||
|
||
public String getClientId() { | ||
return this.clientId; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import org.springframework.context.SmartLifecycle; | ||
|
||
/** | ||
* A utility abstraction over MQTT client which can be used in any MQTT-related component | ||
* without need to handle generic client callbacks, reconnects etc. | ||
* Using this manager in multiple MQTT integrations will preserve a single connection. | ||
* | ||
* @param <T> MQTT client type | ||
* | ||
* @author Artem Vozhdayenko | ||
* | ||
* @since 6.0 | ||
*/ | ||
public interface ClientManager<T> extends SmartLifecycle { | ||
|
||
T getClient(); | ||
|
||
boolean isManualAcks(); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; | ||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; | ||
import org.eclipse.paho.client.mqttv3.MqttCallback; | ||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | ||
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.eclipse.paho.client.mqttv3.MqttMessage; | ||
|
||
import org.springframework.util.Assert; | ||
|
||
/** | ||
* @author Artem Vozhdayenko | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @since 6.0 | ||
*/ | ||
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback { | ||
|
||
private final MqttPahoClientFactory clientFactory; | ||
|
||
private volatile IMqttAsyncClient client; | ||
|
||
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we still need to rely on this factory for this our new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄 |
||
super(clientId); | ||
Assert.notNull(clientFactory, "'clientFactory' is required"); | ||
this.clientFactory = clientFactory; | ||
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions(); | ||
String[] serverURIs = connectionOptions.getServerURIs(); | ||
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'"); | ||
setUrl(serverURIs[0]); | ||
if (!connectionOptions.isAutomaticReconnect()) { | ||
logger.info("If this `ClientManager` is used from message-driven channel adapters, " + | ||
"it is recommended to set 'automaticReconnect' MQTT connection option. " + | ||
"Otherwise connection check and reconnect should be done manually."); | ||
} | ||
} | ||
|
||
public Mqttv3ClientManager(String url, String clientId) { | ||
this(buildDefaultClientFactory(url), clientId); | ||
} | ||
|
||
private static MqttPahoClientFactory buildDefaultClientFactory(String url) { | ||
Assert.notNull(url, "'url' is required"); | ||
MqttConnectOptions connectOptions = new MqttConnectOptions(); | ||
connectOptions.setServerURIs(new String[]{ url }); | ||
connectOptions.setAutomaticReconnect(true); | ||
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory(); | ||
defaultFactory.setConnectionOptions(connectOptions); | ||
return defaultFactory; | ||
} | ||
|
||
@Override | ||
public IMqttAsyncClient getClient() { | ||
return this.client; | ||
} | ||
|
||
@Override | ||
public synchronized void start() { | ||
if (this.client == null) { | ||
try { | ||
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId()); | ||
this.client.setManualAcks(isManualAcks()); | ||
this.client.setCallback(this); | ||
} | ||
catch (MqttException e) { | ||
throw new IllegalStateException("could not start client manager", e); | ||
} | ||
} | ||
try { | ||
MqttConnectOptions options = this.clientFactory.getConnectionOptions(); | ||
this.client.connect(options).waitForCompletion(options.getConnectionTimeout()); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e); | ||
|
||
if (this.clientFactory.getConnectionOptions().isAutomaticReconnect()) { | ||
try { | ||
this.client.reconnect(); | ||
} | ||
catch (MqttException ex) { | ||
logger.error("MQTT client failed to re-connect.", ex); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void stop() { | ||
if (this.client == null) { | ||
return; | ||
} | ||
try { | ||
this.client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout()); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not disconnect from the client", e); | ||
} | ||
finally { | ||
try { | ||
this.client.close(); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not close the client", e); | ||
} | ||
this.client = null; | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized boolean isRunning() { | ||
return this.client != null; | ||
} | ||
|
||
@Override | ||
public synchronized void connectionLost(Throwable cause) { | ||
logger.error("connection lost, client_id=" + this.client.getClientId(), cause); | ||
} | ||
|
||
@Override | ||
public void messageArrived(String topic, MqttMessage message) { | ||
// not this manager concern | ||
} | ||
|
||
@Override | ||
public void deliveryComplete(IMqttDeliveryToken token) { | ||
// nor this manager concern | ||
} | ||
|
||
} |
Uh oh!
There was an error while loading. Please reload this page.