Skip to content

GH-3685: Share MQTT connection across components #3857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.util.Assert;

/**
Expand All @@ -28,40 +30,62 @@
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> {
public abstract class AbstractMqttClientManager<T> implements ClientManager<T>, ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(this.getClass());
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private ApplicationEventPublisher applicationEventPublisher;

private boolean manualAcks;

private String url;

private final String clientId;

volatile T client;

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

public void setManualAcks(boolean manualAcks) {
protected void setManualAcks(boolean manualAcks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this one has to be public

this.manualAcks = manualAcks;
}

public String getUrl() {
protected String getUrl() {
return this.url;
}

public void setUrl(String url) {
protected void setUrl(String url) {
this.url = url;
}

public String getClientId() {
protected String getClientId() {
return this.clientId;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

@Override
public T getClient() {
return this.client;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
this.applicationEventPublisher = applicationEventPublisher;
}

public synchronized boolean isRunning() {
return this.client != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.util.Assert;

/**
Expand All @@ -33,7 +34,9 @@ public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncCli

private final MqttPahoClientFactory clientFactory;

private volatile IMqttAsyncClient client;
public Mqttv3ClientManager(String url, String clientId) {
this(buildDefaultClientFactory(url), clientId);
}

public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need to rely on this factory for this our new ClientManager abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions.
And perhaps we need to add one more MqttClientPersistence option to these managers impls.
A setter should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄

super(clientId);
Expand All @@ -50,10 +53,6 @@ public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId)
}
}

public Mqttv3ClientManager(String url, String clientId) {
this(buildDefaultClientFactory(url), clientId);
}

private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
Assert.notNull(url, "'url' is required");
MqttConnectOptions connectOptions = new MqttConnectOptions();
Expand All @@ -64,11 +63,6 @@ private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
return defaultFactory;
}

@Override
public IMqttAsyncClient getClient() {
return this.client;
}

@Override
public synchronized void start() {
if (this.client == null) {
Expand Down Expand Up @@ -96,6 +90,9 @@ public synchronized void start() {
logger.error("MQTT client failed to re-connect.", ex);
}
}
else if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to extract to the local variable and avoid extra method call.
I don't think the compiler is smart enough to do such an optimization on our behalf...

}
}
}

Expand All @@ -121,11 +118,6 @@ public synchronized void stop() {
}
}

@Override
public synchronized boolean isRunning() {
return this.client != null;
}

@Override
public synchronized void connectionLost(Throwable cause) {
logger.error("connection lost, client_id=" + this.client.getClientId(), cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.util.Assert;

/**
Expand All @@ -36,7 +37,9 @@ public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncCli

private final MqttConnectionOptions connectionOptions;

private volatile IMqttAsyncClient client;
public Mqttv5ClientManager(String url, String clientId) {
this(buildDefaultConnectionOptions(url), clientId);
}

public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clientId) {
super(clientId);
Expand All @@ -51,10 +54,6 @@ public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clien
setUrl(connectionOptions.getServerURIs()[0]);
}

public Mqttv5ClientManager(String url, String clientId) {
this(buildDefaultConnectionOptions(url), clientId);
}

private static MqttConnectionOptions buildDefaultConnectionOptions(String url) {
Assert.notNull(url, "'url' is required");
var connectionOptions = new MqttConnectionOptions();
Expand All @@ -63,11 +62,6 @@ private static MqttConnectionOptions buildDefaultConnectionOptions(String url) {
return connectionOptions;
}

@Override
public IMqttAsyncClient getClient() {
return this.client;
}

@Override
public synchronized void start() {
if (this.client == null) {
Expand Down Expand Up @@ -95,6 +89,9 @@ public synchronized void start() {
logger.error("MQTT client failed to re-connect.", ex);
}
}
else if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DITTO

}
}
}

Expand All @@ -121,11 +118,6 @@ public synchronized void stop() {
}
}

@Override
public synchronized boolean isRunning() {
return this.client != null;
}

@Override
public void messageArrived(String topic, MqttMessage message) {
// not this manager concern
Expand All @@ -152,7 +144,7 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) {
@Override
public void disconnected(MqttDisconnectResponse disconnectResponse) {
if (logger.isInfoEnabled()) {
logger.info("MQTT disconnected" + disconnectResponse);
logger.info("MQTT disconnected: " + disconnectResponse);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message
*/
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L;

private String url;
private final String url;

private String clientId;
private final String clientId;

private final Set<Topic> topics;

Expand All @@ -74,7 +74,7 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message

private MqttMessageConverter converter;

protected ClientManager<T> clientManager;
private final ClientManager<T> clientManager;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nit-pick: final props must be before the others.


protected final Lock topicLock = new ReentrantLock(); // NOSONAR

Expand All @@ -83,12 +83,15 @@ public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clie
this.url = url;
this.clientId = clientId;
this.topics = initTopics(topic);
this.clientManager = null;
}

AbstractMqttMessageDrivenChannelAdapter(ClientManager<T> clientManager, String... topic) {
Assert.notNull(clientManager, "'clientManager' cannot be null");
this.clientManager = clientManager;
this.topics = initTopics(topic);
this.url = null;
this.clientId = null;
}

private Set<Topic> initTopics(String[] topic) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static ?

Expand All @@ -107,7 +110,8 @@ public void setConverter(MqttMessageConverter converter) {
this.converter = converter;
}

public ClientManager<T> getClientManager() {
@Nullable
protected ClientManager<T> getClientManager() {
return this.clientManager;
}

Expand Down Expand Up @@ -155,6 +159,7 @@ protected String getUrl() {
return this.url;
}

@Nullable
protected String getClientId() {
return this.clientId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv

private volatile ConsumerStopAction consumerStopAction;

/**
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
* @param url The URL.
* @param clientId The client id.
* @param topic The topic(s).
*/
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
}

/**
* Use this constructor for a single url (although it may be overridden if the server
* URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()} provided by
Expand Down Expand Up @@ -121,15 +131,14 @@ public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactor
this.clientFactory = clientFactory;
}


/**
* Use this constructor when you don't need additional {@link MqttConnectOptions}.
* @param url The URL.
* @param clientId The client id.
* Use this constructor when you need to use a single {@link ClientManager}
* (for instance, to reuse an MQTT connection).
* @param clientManager The client manager.
* @param topic The topic(s).
*/
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
this(url, clientId, new DefaultMqttPahoClientFactory(), topic);
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
this(clientManager, new MqttConnectOptions(), topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that we have discussed: no contradicting options. Better to take them from the provided client.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still haven't pushed any changes yet 🙂
Sorry, maybe the fact I resolve our conversations confuses you. I do it for myself to check that locally I have resolved this. Probably better to leave them unless the code it on 'your' side..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see . Sorry, yes, makes sense to have it resolved that way. Really confused me: have a lot to review in parallel , so I lose context from time to time 😅

}

/**
Expand All @@ -148,16 +157,6 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clien
this.clientFactory = factory;
}

/**
* Use this constructor when you need to use a single {@link ClientManager}
* (for instance, to reuse an MQTT connection).
* @param clientManager The client manager.
* @param topic The topic(s).
*/
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
this(clientManager, new MqttConnectOptions(), topic);
}

/**
* Set the completion timeout when disconnecting. Not settable using the namespace.
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOpt
}
}

/**
* Use this constructor when you need to use a single {@link ClientManager}
* (for instance, to reuse an MQTT connection).
* @param clientManager The client manager.
* @param topic The topic(s).
*/
public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
this(buildDefaultConnectionOptions(null), clientManager, topic);
}

/**
* Use this constructor when you need to use a single {@link ClientManager}
* (for instance, to reuse an MQTT connection) and a specific {@link MqttConnectionOptions}.
Expand All @@ -128,16 +138,6 @@ public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOpt
}
}

/**
* Use this constructor when you need to use a single {@link ClientManager}
* (for instance, to reuse an MQTT connection).
* @param clientManager The client manager.
* @param topic The topic(s).
*/
public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) {
this(buildDefaultConnectionOptions(null), clientManager, topic);
}

private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this method any more.
Also I think we just need to add Assert.hasText(url, "'url' cannot be null or empty"); into that url-based ctor.
Just because we cannot an empty url without external options: or url, or options with server uris.

final MqttConnectionOptions connectionOptions;
connectionOptions = new MqttConnectionOptions();
Expand Down
Loading