Skip to content

Commit 5f12729

Browse files
oxcafedeadartembilan
authored andcommitted
GH-3685: Share MQTT connection across components
Fixes #3685 Introduce some initial design. Add a new interface `ClientManager` which will manage clients and connections. Use this manager in v3 topic adapter and message handler. Add a new interface `ClientManager` which will manage clients and connections. Add different implementations for v3 and v5 MQTT clients. Use this manager in v3/v5 topic adapters and message handlers. Add a couple of unit/integration tests to cover client manager usage. Several small code improvements after the code review: * Improve client manager usage via providing several mutual exclusive constructors, whether the users provides `url` or `connectionOptions` or `clientFactory` for v3. * Move the logger to `AbstractMqttClientManager` * Do not inject TaskScheduler in constructor for v3 client manager but use lazy init via `BeanFactory` and `IntegrationContextUtils` * Other smaller code readability improvements Add new tests with reconnect cases. Other code improvements after the code review: * Adjust javadocs according to standards * Remove `setClientManager` and use exclusive ctors * Make automatic reconnects using the v3 client instead of manually using task scheduler Some fixes and improvements after another code review iteration: * Rearrange the code according to the code style guides * Move client instance to `AbstractClientManager` with `isRunning` method * Fix abstract adapter/handler fields visibility and `final`ize them where we can * Send application event if automatic reconnect is not enabled for the client manager Other fixes and improvements after code review: * Changes around fields, methods, ctors visibility * Removed contradictory ctors * Reduce amount of unnecessary `getClientManager() != null` checks in logic and make it as similar as possible for client manager and the old approach * Use auto-reconnect where possible * Remove manual reconnect trigger and rely on events instead to know where to subscribe * Do not close the connection in adapter to be able to use reconnect logic without lose of subscriptions * Make `ClientManager` extend `MqttComponent` so that it knows about connection options as part of its contract * Remove not relevant auto test cases (relying on connection close or manual reconnect) * Other code style smaller changes Other fixes and improvements after code review: * Get manual `reconnect` invocation back for v3/v5 adapters and client managers (see bug GH-3822 for a reasoning) * Remove unnecessary getters/setter for a listener and use adapter class as listener instead * Optimize MessageListener: remove redundant inner class and use a single method reference instead of N instances per each subscribe * Javadocs improvements * Add Javadocs to abstract client manager * Extract common callback add/rm logic to abstract adapter class * Small code cleanups/fixes related to code style & simplicity, ctor inits and unnecessary methods; eliminate unnecessary logs noise * Remove `@LongRunningTest` for `ClientManagerBackToBackTests` as test run time is ~6-7 secs * Remove client factory as dependency for v3 client manager and use plain connection properties and client persistence instead * Add missed javadocs * Other code style & cleanup improvements * More code cleanup * More Javadocs
1 parent 9996777 commit 5f12729

16 files changed

+1364
-416
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/config/xml/MqttMessageDrivenChannelAdapterParser.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,7 +48,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
4848
builder.addPropertyReference("outputChannel", channelName);
4949
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
5050
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos");
51-
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval");
5251
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "manual-acks");
5352

5453
return builder.getBeanDefinition();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import java.util.Collections;
20+
import java.util.HashSet;
21+
import java.util.Set;
22+
23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
25+
26+
import org.springframework.context.ApplicationEventPublisher;
27+
import org.springframework.context.ApplicationEventPublisherAware;
28+
import org.springframework.context.SmartLifecycle;
29+
import org.springframework.integration.mqtt.inbound.AbstractMqttMessageDrivenChannelAdapter;
30+
import org.springframework.util.Assert;
31+
32+
/**
33+
* Abstract class for MQTT client managers which can be a base for any common v3/v5 client manager implementation.
34+
* Contains some basic utility and implementation-agnostic fields and methods.
35+
*
36+
* @param <T> MQTT client type
37+
* @param <C> MQTT connection options type (v5 or v3)
38+
*
39+
* @author Artem Vozhdayenko
40+
* @author Artem Bilan
41+
*
42+
* @since 6.0
43+
*/
44+
public abstract class AbstractMqttClientManager<T, C> implements ClientManager<T, C>, ApplicationEventPublisherAware {
45+
46+
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
47+
48+
private static final int DEFAULT_MANAGER_PHASE = 0;
49+
50+
private final Set<ConnectCallback> connectCallbacks = Collections.synchronizedSet(new HashSet<>());
51+
52+
private final String clientId;
53+
54+
private int phase = DEFAULT_MANAGER_PHASE;
55+
56+
private boolean manualAcks;
57+
58+
private ApplicationEventPublisher applicationEventPublisher;
59+
60+
private String url;
61+
62+
private String beanName;
63+
64+
private volatile T client;
65+
66+
protected AbstractMqttClientManager(String clientId) {
67+
Assert.notNull(clientId, "'clientId' is required");
68+
this.clientId = clientId;
69+
}
70+
71+
public void setManualAcks(boolean manualAcks) {
72+
this.manualAcks = manualAcks;
73+
}
74+
75+
protected String getUrl() {
76+
return this.url;
77+
}
78+
79+
protected void setUrl(String url) {
80+
this.url = url;
81+
}
82+
83+
protected String getClientId() {
84+
return this.clientId;
85+
}
86+
87+
protected ApplicationEventPublisher getApplicationEventPublisher() {
88+
return this.applicationEventPublisher;
89+
}
90+
91+
protected synchronized void setClient(T client) {
92+
this.client = client;
93+
}
94+
95+
protected Set<ConnectCallback> getCallbacks() {
96+
return this.connectCallbacks;
97+
}
98+
99+
@Override
100+
public boolean isManualAcks() {
101+
return this.manualAcks;
102+
}
103+
104+
@Override
105+
public T getClient() {
106+
return this.client;
107+
}
108+
109+
@Override
110+
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
111+
Assert.notNull(applicationEventPublisher, "'applicationEventPublisher' cannot be null");
112+
this.applicationEventPublisher = applicationEventPublisher;
113+
}
114+
115+
@Override
116+
public void setBeanName(String name) {
117+
this.beanName = name;
118+
}
119+
120+
@Override
121+
public String getBeanName() {
122+
return this.beanName;
123+
}
124+
125+
/**
126+
* The phase of component autostart in {@link SmartLifecycle}.
127+
* If the custom one is required, note that for the correct behavior it should be less than phase of
128+
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
129+
* The default phase is {@link #DEFAULT_MANAGER_PHASE}.
130+
* @return {@link SmartLifecycle} autostart phase
131+
* @see #setPhase
132+
*/
133+
@Override
134+
public int getPhase() {
135+
return this.phase;
136+
}
137+
138+
@Override
139+
public void addCallback(ConnectCallback connectCallback) {
140+
this.connectCallbacks.add(connectCallback);
141+
}
142+
143+
@Override
144+
public boolean removeCallback(ConnectCallback connectCallback) {
145+
return this.connectCallbacks.remove(connectCallback);
146+
}
147+
148+
public synchronized boolean isRunning() {
149+
return this.client != null;
150+
}
151+
152+
/**
153+
* Set the phase of component autostart in {@link SmartLifecycle}.
154+
* If the custom one is required, note that for the correct behavior it should be less than phase of
155+
* {@link AbstractMqttMessageDrivenChannelAdapter} implementations.
156+
* @see #getPhase
157+
*/
158+
public void setPhase(int phase) {
159+
this.phase = phase;
160+
}
161+
162+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2022-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.core;
18+
19+
import org.springframework.context.SmartLifecycle;
20+
21+
/**
22+
* A utility abstraction over MQTT client which can be used in any MQTT-related component
23+
* without need to handle generic client callbacks, reconnects etc.
24+
* Using this manager in multiple MQTT integrations will preserve a single connection.
25+
*
26+
* @param <T> MQTT client type
27+
* @param <C> MQTT connection options type (v5 or v3)
28+
*
29+
* @author Artem Vozhdayenko
30+
* @author Artem Bilan
31+
*
32+
* @since 6.0
33+
*/
34+
public interface ClientManager<T, C> extends SmartLifecycle, MqttComponent<C> {
35+
36+
/**
37+
* Return the managed client.
38+
* @return the managed client.
39+
*/
40+
T getClient();
41+
42+
/**
43+
* If manual acknowledge has to be used; false by default.
44+
* @return true if manual acknowledge has to be used.
45+
*/
46+
boolean isManualAcks();
47+
48+
/**
49+
* Register a callback for the {@code connectComplete} event from the client.
50+
* @param connectCallback a {@link ConnectCallback} to register.
51+
*/
52+
void addCallback(ConnectCallback connectCallback);
53+
54+
/**
55+
* Remove the callback from registration.
56+
* @param connectCallback a {@link ConnectCallback} to unregister.
57+
* @return true if callback was removed.
58+
*/
59+
boolean removeCallback(ConnectCallback connectCallback);
60+
61+
/**
62+
* A contract for a custom callback on {@code connectComplete} event from the client.
63+
*
64+
* @see org.eclipse.paho.mqttv5.client.MqttCallback#connectComplete
65+
* @see org.eclipse.paho.client.mqttv3.MqttCallbackExtended#connectComplete
66+
*/
67+
@FunctionalInterface
68+
interface ConnectCallback {
69+
70+
/**
71+
* Called when the connection to the server is completed successfully.
72+
* @param isReconnect if true, the connection was the result of automatic reconnect.
73+
*/
74+
void connectComplete(boolean isReconnect);
75+
76+
}
77+
78+
}

0 commit comments

Comments
 (0)