Skip to content

GH-3685: Share MQTT connection across components #3857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.util.Assert;

/**
* @param <T> MQTT client type
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> {

protected final Log logger = LogFactory.getLog(this.getClass());

private boolean manualAcks;

private String url;

private final String clientId;

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

public void setManualAcks(boolean manualAcks) {
this.manualAcks = manualAcks;
}

public String getUrl() {
return this.url;
}

public void setUrl(String url) {
this.url = url;
}

public String getClientId() {
return this.clientId;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.springframework.context.SmartLifecycle;

/**
* A utility abstraction over MQTT client which can be used in any MQTT-related component
* without need to handle generic client callbacks, reconnects etc.
* Using this manager in multiple MQTT integrations will preserve a single connection.
*
* @param <T> MQTT client type
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public interface ClientManager<T> extends SmartLifecycle {

T getClient();

boolean isManualAcks();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.util.Assert;

/**
* @author Artem Vozhdayenko
* @since 6.0
*/
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient> implements MqttCallback {

private final MqttPahoClientFactory clientFactory;

private volatile IMqttAsyncClient client;

public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need to rely on this factory for this our new ClientManager abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions.
And perhaps we need to add one more MqttClientPersistence option to these managers impls.
A setter should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄

super(clientId);
Assert.notNull(clientFactory, "'clientFactory' is required");
this.clientFactory = clientFactory;
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions();
String[] serverURIs = connectionOptions.getServerURIs();
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
setUrl(serverURIs[0]);
if (!connectionOptions.isAutomaticReconnect()) {
logger.info("If this `ClientManager` is used from message-driven channel adapters, " +
"it is recommended to set 'automaticReconnect' MQTT connection option. " +
"Otherwise connection check and reconnect should be done manually.");
}
}

public Mqttv3ClientManager(String url, String clientId) {
this(buildDefaultClientFactory(url), clientId);
}

private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
Assert.notNull(url, "'url' is required");
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]{ url });
connectOptions.setAutomaticReconnect(true);
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory();
defaultFactory.setConnectionOptions(connectOptions);
return defaultFactory;
}

@Override
public IMqttAsyncClient getClient() {
return this.client;
}

@Override
public synchronized void start() {
if (this.client == null) {
try {
this.client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
this.client.setManualAcks(isManualAcks());
this.client.setCallback(this);
}
catch (MqttException e) {
throw new IllegalStateException("could not start client manager", e);
}
}
try {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
this.client.connect(options).waitForCompletion(options.getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not start client manager, client_id=" + this.client.getClientId(), e);

if (this.clientFactory.getConnectionOptions().isAutomaticReconnect()) {
try {
this.client.reconnect();
}
catch (MqttException ex) {
logger.error("MQTT client failed to re-connect.", ex);
}
}
}
}

@Override
public synchronized void stop() {
if (this.client == null) {
return;
}
try {
this.client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not disconnect from the client", e);
}
finally {
try {
this.client.close();
}
catch (MqttException e) {
logger.error("could not close the client", e);
}
this.client = null;
}
}

@Override
public synchronized boolean isRunning() {
return this.client != null;
}

@Override
public synchronized void connectionLost(Throwable cause) {
logger.error("connection lost, client_id=" + this.client.getClientId(), cause);
}

@Override
public void messageArrived(String topic, MqttMessage message) {
// not this manager concern
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// nor this manager concern
}

}
Loading