Skip to content

GH-3685: Share MQTT connection across components #3857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
builder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");

return builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.util.Assert;

/**
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private final Set<ConnectCallback> connectCallbacks;

private final String clientId;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private String url;

private volatile T client;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just another nit-pick: volatile props go after all others 😄


private String beanName;

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
this.connectCallbacks = Collections.synchronizedSet(new HashSet<>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that I have commented before that this initialization could be done directly on the property declaration.

}

protected void setManualAcks(boolean manualAcks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this one has to be public

this.manualAcks = manualAcks;
}

protected String getUrl() {
return this.url;
}

protected void setUrl(String url) {
this.url = url;
}

protected String getClientId() {
return this.clientId;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

protected synchronized void setClient(T client) {
this.client = client;
}

protected Set<ConnectCallback> getCallbacks() {
return Collections.unmodifiableSet(this.connectCallbacks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is protected, so unlikely going to be used for some outside harm.
Therefore I don't see a reason in extra wrapping to prevent modification.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the defense if someone decides to implement own client manager incorrectly, but I will remove it, no problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to implement and to use is not the same and there is likely less harm when it is an internal logic than some end-user API.

}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

@Override
public T getClient() {
return this.client;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

@Override
public String getBeanName() {
return this.beanName;
}

/**
* The phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* @return {@link SmartLifecycle} autostart phase
*/
@Override
public int getPhase() {
return 0;
}

@Override
public void addCallback(ConnectCallback connectCallback) {
this.connectCallbacks.add(connectCallback);
}

@Override
public boolean removeCallback(ConnectCallback connectCallback) {
return this.connectCallbacks.remove(connectCallback);
}

public synchronized boolean isRunning() {
return this.client != null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.springframework.context.SmartLifecycle;

/**
* A utility abstraction over MQTT client which can be used in any MQTT-related component
* without need to handle generic client callbacks, reconnects etc.
* Using this manager in multiple MQTT integrations will preserve a single connection.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {

T getClient();

boolean isManualAcks();

void addCallback(ConnectCallback connectCallback);

boolean removeCallback(ConnectCallback connectCallback);

/**
* A contract for a custom callback if needed by a usage.
*/
interface ConnectCallback {

void connectComplete(boolean isReconnect);

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.util.Assert;

/**
* @author Artem Vozhdayenko
* @since 6.0
*/
public class Mqttv3ClientManager extends AbstractMqttClientManager<IMqttAsyncClient, MqttConnectOptions>
implements MqttCallbackExtended {

private final MqttPahoClientFactory clientFactory;

public Mqttv3ClientManager(String url, String clientId) {
this(buildDefaultClientFactory(url), clientId);
}

public Mqttv3ClientManager(MqttPahoClientFactory clientFactory, String clientId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we still need to rely on this factory for this our new ClientManager abstraction.
How about just to make this v3 version similar to v5 - based on the MqttConnectionOptions.
And perhaps we need to add one more MqttClientPersistence option to these managers impls.
A setter should be OK.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, initially if I remember correctly you asked to implement client manager using existing client factory abstraction 😄

super(clientId);
Assert.notNull(clientFactory, "'clientFactory' is required");
this.clientFactory = clientFactory;
MqttConnectOptions connectionOptions = clientFactory.getConnectionOptions();
String[] serverURIs = connectionOptions.getServerURIs();
Assert.notEmpty(serverURIs, "'serverURIs' must be provided in the 'MqttConnectionOptions'");
setUrl(serverURIs[0]);
if (!connectionOptions.isAutomaticReconnect()) {
logger.info("If this `ClientManager` is used from message-driven channel adapters, " +
"it is recommended to set 'automaticReconnect' MQTT connection option. " +
"Otherwise connection check and reconnect should be done manually.");
}
}

private static MqttPahoClientFactory buildDefaultClientFactory(String url) {
Assert.notNull(url, "'url' is required");
MqttConnectOptions connectOptions = new MqttConnectOptions();
connectOptions.setServerURIs(new String[]{ url });
connectOptions.setAutomaticReconnect(true);
DefaultMqttPahoClientFactory defaultFactory = new DefaultMqttPahoClientFactory();
defaultFactory.setConnectionOptions(connectOptions);
return defaultFactory;
}

@Override
public synchronized void start() {
if (getClient() == null) {
try {
var client = this.clientFactory.getAsyncClientInstance(getUrl(), getClientId());
client.setManualAcks(isManualAcks());
client.setCallback(this);
setClient(client);
}
catch (MqttException e) {
throw new IllegalStateException("could not start client manager", e);
}
}
try {
MqttConnectOptions options = this.clientFactory.getConnectionOptions();
getClient().connect(options).waitForCompletion(options.getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not start client manager, client_id=" + getClientId(), e);

var applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, e));
}
}
}

@Override
public synchronized void stop() {
var client = getClient();
if (client == null) {
return;
}
try {
client.disconnectForcibly(this.clientFactory.getConnectionOptions().getConnectionTimeout());
}
catch (MqttException e) {
logger.error("could not disconnect from the client", e);
}
finally {
try {
client.close();
}
catch (MqttException e) {
logger.error("could not close the client", e);
}
setClient(null);
}
}

@Override
public synchronized void connectionLost(Throwable cause) {
logger.error("connection lost, client_id=" + getClientId(), cause);
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
getCallbacks().forEach(callback -> callback.connectComplete(reconnect));
}

@Override
public void messageArrived(String topic, MqttMessage message) {
// not this manager concern
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// nor this manager concern
}

@Override
public MqttConnectOptions getConnectionInfo() {
return this.clientFactory.getConnectionOptions();
}
}
Loading