-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-3685: Share MQTT connection across components #3857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3251fd2
d9a0474
9f47cc8
d338577
4cc094f
f3fba7d
f595998
b5da36b
318fd82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,8 @@ | |
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import org.springframework.context.ApplicationEventPublisher; | ||
import org.springframework.context.ApplicationEventPublisherAware; | ||
import org.springframework.util.Assert; | ||
|
||
/** | ||
|
@@ -28,40 +30,62 @@ | |
* | ||
* @since 6.0 | ||
*/ | ||
public abstract class AbstractMqttClientManager<T> implements ClientManager<T> { | ||
public abstract class AbstractMqttClientManager<T> implements ClientManager<T>, ApplicationEventPublisherAware { | ||
|
||
protected final Log logger = LogFactory.getLog(this.getClass()); | ||
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR | ||
|
||
private ApplicationEventPublisher applicationEventPublisher; | ||
|
||
private boolean manualAcks; | ||
|
||
private String url; | ||
|
||
private final String clientId; | ||
|
||
volatile T client; | ||
|
||
AbstractMqttClientManager(String clientId) { | ||
Assert.notNull(clientId, "'clientId' is required"); | ||
this.clientId = clientId; | ||
} | ||
|
||
@Override | ||
public boolean isManualAcks() { | ||
return this.manualAcks; | ||
} | ||
|
||
public void setManualAcks(boolean manualAcks) { | ||
protected void setManualAcks(boolean manualAcks) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that this one has to be |
||
this.manualAcks = manualAcks; | ||
} | ||
|
||
public String getUrl() { | ||
protected String getUrl() { | ||
return this.url; | ||
} | ||
|
||
public void setUrl(String url) { | ||
protected void setUrl(String url) { | ||
this.url = url; | ||
oxcafedead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
public String getClientId() { | ||
protected String getClientId() { | ||
return this.clientId; | ||
} | ||
|
||
protected ApplicationEventPublisher getApplicationEventPublisher() { | ||
return this.applicationEventPublisher; | ||
} | ||
|
||
@Override | ||
public boolean isManualAcks() { | ||
return this.manualAcks; | ||
} | ||
|
||
@Override | ||
public T getClient() { | ||
return this.client; | ||
} | ||
|
||
@Override | ||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { | ||
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null"); | ||
this.applicationEventPublisher = applicationEventPublisher; | ||
} | ||
|
||
public synchronized boolean isRunning() { | ||
return this.client != null; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.eclipse.paho.client.mqttv3.MqttMessage; | ||
|
||
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; | ||
import org.springframework.util.Assert; | ||
|
||
/** | ||
|
@@ -33,7 +34,9 @@ public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncCli | |
|
||
private final MqttPahoClientFactory clientFactory; | ||
|
||
private volatile IMqttAsyncClient client; | ||
public Mqttv3ClientManager(String url, String clientId) { | ||
this(buildDefaultClientFactory(url), clientId); | ||
} | ||
|
||
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we still need to rely on this factory for this our new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄 |
||
super(clientId); | ||
|
@@ -50,10 +53,6 @@ public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) | |
} | ||
} | ||
|
||
public Mqttv3ClientManager(String url, String clientId) { | ||
this(buildDefaultClientFactory(url), clientId); | ||
} | ||
|
||
private static MqttPahoClientFactory buildDefaultClientFactory(String url) { | ||
Assert.notNull(url, "'url' is required"); | ||
MqttConnectOptions connectOptions = new MqttConnectOptions(); | ||
|
@@ -64,11 +63,6 @@ private static MqttPahoClientFactory buildDefaultClientFactory(String url) { | |
return defaultFactory; | ||
} | ||
|
||
@Override | ||
public IMqttAsyncClient getClient() { | ||
return this.client; | ||
} | ||
|
||
@Override | ||
public synchronized void start() { | ||
if (this.client == null) { | ||
|
@@ -96,6 +90,9 @@ public synchronized void start() { | |
logger.error("MQTT client failed to re-connect.", ex); | ||
} | ||
} | ||
else if (getApplicationEventPublisher() != null) { | ||
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Better to extract to the local variable and avoid extra method call. |
||
} | ||
} | ||
} | ||
|
||
|
@@ -121,11 +118,6 @@ public synchronized void stop() { | |
} | ||
} | ||
|
||
@Override | ||
public synchronized boolean isRunning() { | ||
return this.client != null; | ||
} | ||
|
||
@Override | ||
public synchronized void connectionLost(Throwable cause) { | ||
logger.error("connection lost, client_id=" + this.client.getClientId(), cause); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ | |
import org.eclipse.paho.mqttv5.common.MqttMessage; | ||
import org.eclipse.paho.mqttv5.common.packet.MqttProperties; | ||
|
||
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; | ||
import org.springframework.util.Assert; | ||
|
||
/** | ||
|
@@ -36,7 +37,9 @@ public class Mqttv5ClientManager extends AbstractMqttClientManager<IMqttAsyncCli | |
|
||
private final MqttConnectionOptions connectionOptions; | ||
|
||
private volatile IMqttAsyncClient client; | ||
public Mqttv5ClientManager(String url, String clientId) { | ||
this(buildDefaultConnectionOptions(url), clientId); | ||
} | ||
|
||
public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clientId) { | ||
super(clientId); | ||
|
@@ -51,10 +54,6 @@ public Mqttv5ClientManager(MqttConnectionOptions connectionOptions, String clien | |
setUrl(connectionOptions.getServerURIs()[0]); | ||
} | ||
|
||
public Mqttv5ClientManager(String url, String clientId) { | ||
this(buildDefaultConnectionOptions(url), clientId); | ||
} | ||
|
||
private static MqttConnectionOptions buildDefaultConnectionOptions(String url) { | ||
Assert.notNull(url, "'url' is required"); | ||
var connectionOptions = new MqttConnectionOptions(); | ||
|
@@ -63,11 +62,6 @@ private static MqttConnectionOptions buildDefaultConnectionOptions(String url) { | |
return connectionOptions; | ||
} | ||
|
||
@Override | ||
public IMqttAsyncClient getClient() { | ||
return this.client; | ||
} | ||
|
||
@Override | ||
public synchronized void start() { | ||
if (this.client == null) { | ||
|
@@ -95,6 +89,9 @@ public synchronized void start() { | |
logger.error("MQTT client failed to re-connect.", ex); | ||
} | ||
} | ||
else if (getApplicationEventPublisher() != null) { | ||
getApplicationEventPublisher().publishEvent(new MqttConnectionFailedEvent(this, e)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DITTO |
||
} | ||
} | ||
} | ||
|
||
|
@@ -121,11 +118,6 @@ public synchronized void stop() { | |
} | ||
} | ||
|
||
@Override | ||
public synchronized boolean isRunning() { | ||
return this.client != null; | ||
} | ||
|
||
@Override | ||
public void messageArrived(String topic, MqttMessage message) { | ||
// not this manager concern | ||
|
@@ -152,7 +144,7 @@ public void authPacketArrived(int reasonCode, MqttProperties properties) { | |
@Override | ||
public void disconnected(MqttDisconnectResponse disconnectResponse) { | ||
if (logger.isInfoEnabled()) { | ||
logger.info("MQTT disconnected" + disconnectResponse); | ||
logger.info("MQTT disconnected: " + disconnectResponse); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,9 +60,9 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message | |
*/ | ||
public static final long DEFAULT_COMPLETION_TIMEOUT = 30_000L; | ||
|
||
private String url; | ||
private final String url; | ||
|
||
private String clientId; | ||
private final String clientId; | ||
|
||
private final Set<Topic> topics; | ||
|
||
|
@@ -74,7 +74,7 @@ public abstract class AbstractMqttMessageDrivenChannelAdapter<T> extends Message | |
|
||
private MqttMessageConverter converter; | ||
|
||
protected ClientManager<T> clientManager; | ||
private final ClientManager<T> clientManager; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just nit-pick: |
||
|
||
protected final Lock topicLock = new ReentrantLock(); // NOSONAR | ||
|
||
|
@@ -83,12 +83,15 @@ public AbstractMqttMessageDrivenChannelAdapter(@Nullable String url, String clie | |
this.url = url; | ||
this.clientId = clientId; | ||
this.topics = initTopics(topic); | ||
this.clientManager = null; | ||
} | ||
|
||
AbstractMqttMessageDrivenChannelAdapter(ClientManager<T> clientManager, String... topic) { | ||
Assert.notNull(clientManager, "'clientManager' cannot be null"); | ||
this.clientManager = clientManager; | ||
this.topics = initTopics(topic); | ||
this.url = null; | ||
this.clientId = null; | ||
} | ||
|
||
private Set<Topic> initTopics(String[] topic) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
@@ -107,7 +110,8 @@ public void setConverter(MqttMessageConverter converter) { | |
this.converter = converter; | ||
} | ||
|
||
public ClientManager<T> getClientManager() { | ||
@Nullable | ||
protected ClientManager<T> getClientManager() { | ||
return this.clientManager; | ||
} | ||
|
||
|
@@ -155,6 +159,7 @@ protected String getUrl() { | |
return this.url; | ||
} | ||
|
||
@Nullable | ||
protected String getClientId() { | ||
return this.clientId; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,16 @@ public class MqttPahoMessageDrivenChannelAdapter extends AbstractMqttMessageDriv | |
|
||
private volatile ConsumerStopAction consumerStopAction; | ||
|
||
/** | ||
* Use this constructor when you don't need additional {@link MqttConnectOptions}. | ||
* @param url The URL. | ||
* @param clientId The client id. | ||
* @param topic The topic(s). | ||
*/ | ||
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { | ||
this(url, clientId, new DefaultMqttPahoClientFactory(), topic); | ||
} | ||
|
||
/** | ||
* Use this constructor for a single url (although it may be overridden if the server | ||
* URI(s) are provided by the {@link MqttConnectOptions#getServerURIs()} provided by | ||
|
@@ -121,15 +131,14 @@ public MqttPahoMessageDrivenChannelAdapter(String clientId, MqttPahoClientFactor | |
this.clientFactory = clientFactory; | ||
} | ||
|
||
|
||
/** | ||
* Use this constructor when you don't need additional {@link MqttConnectOptions}. | ||
* @param url The URL. | ||
* @param clientId The client id. | ||
* Use this constructor when you need to use a single {@link ClientManager} | ||
* (for instance, to reuse an MQTT connection). | ||
* @param clientManager The client manager. | ||
* @param topic The topic(s). | ||
*/ | ||
public MqttPahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) { | ||
this(url, clientId, new DefaultMqttPahoClientFactory(), topic); | ||
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) { | ||
this(clientManager, new MqttConnectOptions(), topic); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought that we have discussed: no contradicting options. Better to take them from the provided client. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still haven't pushed any changes yet 🙂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see . Sorry, yes, makes sense to have it resolved that way. Really confused me: have a lot to review in parallel , so I lose context from time to time 😅 |
||
} | ||
|
||
/** | ||
|
@@ -148,16 +157,6 @@ public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clien | |
this.clientFactory = factory; | ||
} | ||
|
||
/** | ||
* Use this constructor when you need to use a single {@link ClientManager} | ||
* (for instance, to reuse an MQTT connection). | ||
* @param clientManager The client manager. | ||
* @param topic The topic(s). | ||
*/ | ||
public MqttPahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) { | ||
this(clientManager, new MqttConnectOptions(), topic); | ||
} | ||
|
||
/** | ||
* Set the completion timeout when disconnecting. Not settable using the namespace. | ||
* Default {@value #DISCONNECT_COMPLETION_TIMEOUT} milliseconds. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,16 @@ public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOpt | |
} | ||
} | ||
|
||
/** | ||
* Use this constructor when you need to use a single {@link ClientManager} | ||
* (for instance, to reuse an MQTT connection). | ||
* @param clientManager The client manager. | ||
* @param topic The topic(s). | ||
oxcafedead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) { | ||
this(buildDefaultConnectionOptions(null), clientManager, topic); | ||
} | ||
|
||
/** | ||
* Use this constructor when you need to use a single {@link ClientManager} | ||
* (for instance, to reuse an MQTT connection) and a specific {@link MqttConnectionOptions}. | ||
|
@@ -128,16 +138,6 @@ public Mqttv5PahoMessageDrivenChannelAdapter(MqttConnectionOptions connectionOpt | |
} | ||
} | ||
|
||
/** | ||
* Use this constructor when you need to use a single {@link ClientManager} | ||
* (for instance, to reuse an MQTT connection). | ||
* @param clientManager The client manager. | ||
* @param topic The topic(s). | ||
*/ | ||
public Mqttv5PahoMessageDrivenChannelAdapter(ClientManager<IMqttAsyncClient> clientManager, String... topic) { | ||
this(buildDefaultConnectionOptions(null), clientManager, topic); | ||
} | ||
|
||
private static MqttConnectionOptions buildDefaultConnectionOptions(@Nullable String url) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need this method any more. |
||
final MqttConnectionOptions connectionOptions; | ||
connectionOptions = new MqttConnectionOptions(); | ||
|
Uh oh!
There was an error while loading. Please reload this page.