-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-3685: Share MQTT connection across components #3857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
3251fd2
d9a0474
9f47cc8
d338577
4cc094f
f3fba7d
f595998
b5da36b
318fd82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
|
||
import org.apache.commons.logging.Log; | ||
import org.apache.commons.logging.LogFactory; | ||
|
||
import org.springframework.context.ApplicationEventPublisher; | ||
import org.springframework.context.ApplicationEventPublisherAware; | ||
import org.springframework.context.SmartLifecycle; | ||
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter; | ||
import org.springframework.util.Assert; | ||
|
||
/** | ||
* @param <T> MQTT client type | ||
* @param <C> MQTT connection options type (v5 or v3) | ||
* | ||
* @author Artem Vozhdayenko | ||
* | ||
* @since 6.0 | ||
*/ | ||
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware { | ||
|
||
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR | ||
|
||
private final Set<ConnectCallback> connectCallbacks; | ||
|
||
private final String clientId; | ||
|
||
private boolean manualAcks; | ||
|
||
private ApplicationEventPublisher applicationEventPublisher; | ||
|
||
private String url; | ||
|
||
private volatile T client; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just another nit-pick: |
||
|
||
private String beanName; | ||
|
||
AbstractMqttClientManager(String clientId) { | ||
Assert.notNull(clientId, "'clientId' is required"); | ||
this.clientId = clientId; | ||
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought that I have commented before that this initialization could be done directly on the property declaration. |
||
} | ||
|
||
protected void setManualAcks(boolean manualAcks) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe that this one has to be |
||
this.manualAcks = manualAcks; | ||
} | ||
|
||
protected String getUrl() { | ||
return this.url; | ||
} | ||
|
||
protected void setUrl(String url) { | ||
this.url = url; | ||
oxcafedead marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
protected String getClientId() { | ||
return this.clientId; | ||
} | ||
|
||
protected ApplicationEventPublisher getApplicationEventPublisher() { | ||
return this.applicationEventPublisher; | ||
} | ||
|
||
protected synchronized void setClient(T client) { | ||
this.client = client; | ||
} | ||
|
||
protected Set<ConnectCallback> getCallbacks() { | ||
return Collections.unmodifiableSet(this.connectCallbacks); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just the defense if someone decides to implement own client manager incorrectly, but I will remove it, no problem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
@Override | ||
public boolean isManualAcks() { | ||
return this.manualAcks; | ||
} | ||
|
||
@Override | ||
public T getClient() { | ||
return this.client; | ||
} | ||
|
||
@Override | ||
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { | ||
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null"); | ||
this.applicationEventPublisher = applicationEventPublisher; | ||
} | ||
|
||
@Override | ||
public void setBeanName(String name) { | ||
this.beanName = name; | ||
} | ||
|
||
@Override | ||
public String getBeanName() { | ||
return this.beanName; | ||
} | ||
|
||
/** | ||
* The phase of component autostart in {@link SmartLifecycle}. | ||
* If the custom one is required, note that for the correct behavior it should be less than phase of | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations. | ||
* @return {@link SmartLifecycle} autostart phase | ||
*/ | ||
@Override | ||
public int getPhase() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public void addCallback(ConnectCallback connectCallback) { | ||
this.connectCallbacks.add(connectCallback); | ||
} | ||
|
||
@Override | ||
public boolean removeCallback(ConnectCallback connectCallback) { | ||
return this.connectCallbacks.remove(connectCallback); | ||
} | ||
|
||
public synchronized boolean isRunning() { | ||
return this.client != null; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import org.springframework.context.SmartLifecycle; | ||
|
||
/** | ||
* A utility abstraction over MQTT client which can be used in any MQTT-related component | ||
* without need to handle generic client callbacks, reconnects etc. | ||
* Using this manager in multiple MQTT integrations will preserve a single connection. | ||
* | ||
* @param <T> MQTT client type | ||
* @param <C> MQTT connection options type (v5 or v3) | ||
* | ||
* @author Artem Vozhdayenko | ||
* | ||
* @since 6.0 | ||
*/ | ||
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> { | ||
|
||
T getClient(); | ||
|
||
boolean isManualAcks(); | ||
|
||
void addCallback(ConnectCallback connectCallback); | ||
|
||
boolean removeCallback(ConnectCallback connectCallback); | ||
|
||
/** | ||
* A contract for a custom callback if needed by a usage. | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*/ | ||
interface ConnectCallback { | ||
|
||
void connectComplete(boolean isReconnect); | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
/* | ||
* Copyright 2022-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.integration.mqtt.core; | ||
|
||
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; | ||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; | ||
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; | ||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | ||
import org.eclipse.paho.client.mqttv3.MqttException; | ||
import org.eclipse.paho.client.mqttv3.MqttMessage; | ||
|
||
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent; | ||
import org.springframework.util.Assert; | ||
|
||
/** | ||
* @author Artem Vozhdayenko | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @since 6.0 | ||
*/ | ||
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient, MqttConnectOptions> | ||
implements MqttCallbackExtended { | ||
|
||
private final MqttPahoClientFactory clientFactory; | ||
|
||
public Mqttv3ClientManager(String url, String clientId) { | ||
this(buildDefaultClientFactory(url), clientId); | ||
} | ||
|
||
public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we still need to rely on this factory for this our new There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄 |
||
super(clientId); | ||
Assert.notNull(clientFactory, "'clientFactory' is required"); | ||
this.clientFactory = clientFactory; | ||
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions(); | ||
String[] serverURIs = connectionOptions.getServerURIs(); | ||
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'"); | ||
setUrl(serverURIs[0]); | ||
if (!connectionOptions.isAutomaticReconnect()) { | ||
logger.info("If this `ClientManager` is used from message-driven channel adapters, " + | ||
"it is recommended to set 'automaticReconnect' MQTT connection option. " + | ||
"Otherwise connection check and reconnect should be done manually."); | ||
} | ||
} | ||
|
||
private static MqttPahoClientFactory buildDefaultClientFactory(String url) { | ||
Assert.notNull(url, "'url' is required"); | ||
MqttConnectOptions connectOptions = new MqttConnectOptions(); | ||
connectOptions.setServerURIs(new String[]{ url }); | ||
connectOptions.setAutomaticReconnect(true); | ||
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory(); | ||
defaultFactory.setConnectionOptions(connectOptions); | ||
return defaultFactory; | ||
} | ||
|
||
@Override | ||
public synchronized void start() { | ||
if (getClient() == null) { | ||
try { | ||
var client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId()); | ||
client.setManualAcks(isManualAcks()); | ||
client.setCallback(this); | ||
setClient(client); | ||
} | ||
catch (MqttException e) { | ||
throw new IllegalStateException("could not start client manager", e); | ||
} | ||
} | ||
try { | ||
MqttConnectOptions options = this.clientFactory.getConnectionOptions(); | ||
getClient().connect(options).waitForCompletion(options.getConnectionTimeout()); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not start client manager, client_id=" + getClientId(), e); | ||
|
||
var applicationEventPublisher = getApplicationEventPublisher(); | ||
if (applicationEventPublisher != null) { | ||
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void stop() { | ||
var client = getClient(); | ||
if (client == null) { | ||
return; | ||
} | ||
try { | ||
client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout()); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not disconnect from the client", e); | ||
} | ||
finally { | ||
try { | ||
client.close(); | ||
} | ||
catch (MqttException e) { | ||
logger.error("could not close the client", e); | ||
} | ||
setClient(null); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void connectionLost(Throwable cause) { | ||
logger.error("connection lost, client_id=" + getClientId(), cause); | ||
} | ||
|
||
@Override | ||
public void connectComplete(boolean reconnect, String serverURI) { | ||
getCallbacks().forEach(callback -> callback.connectComplete(reconnect)); | ||
} | ||
|
||
@Override | ||
public void messageArrived(String topic, MqttMessage message) { | ||
// not this manager concern | ||
} | ||
|
||
@Override | ||
public void deliveryComplete(IMqttDeliveryToken token) { | ||
// nor this manager concern | ||
} | ||
|
||
@Override | ||
public MqttConnectOptions getConnectionInfo() { | ||
return this.clientFactory.getConnectionOptions(); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.