Skip to content

GH-3685: Share MQTT connection across components #3857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
builder.addPropertyReference("outputChannel", channelName);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");

return builder.getBeanDefinition();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
import org.springframework.util.Assert;

/**
* Abstract class for MQTT client managers which can be a base for any common v3/v5 client manager implementation.
* Contains some basic utility and implementation-agnostic fields and methods.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
* @param <P> MQTT client persistence type (for v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public abstract class AbstractMqttClientManager<T, C, P> implements ClientManager<T, C>, ApplicationEventPublisherAware {

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

private static final int DEFAULT_MANAGER_PHASE = 0;

private final Set<ConnectCallback> connectCallbacks = Collections.synchronizedSet(new HashSet<>());

private final String clientId;

private int phase = DEFAULT_MANAGER_PHASE;

private boolean manualAcks;

private ApplicationEventPublisher applicationEventPublisher;

private P persistence;

private String url;

private String beanName;

private volatile T client;

AbstractMqttClientManager(String clientId) {
Assert.notNull(clientId, "'clientId' is required");
this.clientId = clientId;
}

protected void setManualAcks(boolean manualAcks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this one has to be public

this.manualAcks = manualAcks;
}

protected String getUrl() {
return this.url;
}

protected void setUrl(String url) {
this.url = url;
}

protected String getClientId() {
return this.clientId;
}

protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

protected synchronized void setClient(T client) {
this.client = client;
}

protected P getPersistence() {
return this.persistence;
}

/**
* Set client persistence if some specific impl is required for topics QoS.
* @param persistence persistence implementation to use for te client
*/
public void setPersistence(P persistence) {
this.persistence = persistence;
}

protected Set<ConnectCallback> getCallbacks() {
return this.connectCallbacks;
}

@Override
public boolean isManualAcks() {
return this.manualAcks;
}

@Override
public T getClient() {
return this.client;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

@Override
public String getBeanName() {
return this.beanName;
}

/**
* The phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.
* @return {@link SmartLifecycle} autostart phase
* @see #setPhase
*/
@Override
public int getPhase() {
return this.phase;
}

@Override
public void addCallback(ConnectCallback connectCallback) {
this.connectCallbacks.add(connectCallback);
}

@Override
public boolean removeCallback(ConnectCallback connectCallback) {
return this.connectCallbacks.remove(connectCallback);
}

public synchronized boolean isRunning() {
return this.client != null;
}

/**
* Set the phase of component autostart in {@link SmartLifecycle}.
* If the custom one is required, note that for the correct behavior it should be less than phase of
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
* @see #getPhase
*/
public void setPhase(int phase) {
this.phase = phase;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.mqtt.core;

import org.springframework.context.SmartLifecycle;

/**
* A utility abstraction over MQTT client which can be used in any MQTT-related component
* without need to handle generic client callbacks, reconnects etc.
* Using this manager in multiple MQTT integrations will preserve a single connection.
*
* @param <T> MQTT client type
* @param <C> MQTT connection options type (v5 or v3)
*
* @author Artem Vozhdayenko
*
* @since 6.0
*/
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {

T getClient();

boolean isManualAcks();

void addCallback(ConnectCallback connectCallback);

boolean removeCallback(ConnectCallback connectCallback);

/**
* A contract for a custom callback if needed by a usage.
*
* @see org.eclipse.paho.mqttv5.client.MqttCallback#connectComplete
* @see org.eclipse.paho.client.mqttv3.MqttCallbackExtended#connectComplete
*/
interface ConnectCallback {

void connectComplete(boolean isReconnect);

}

}
Loading