Skip to content

Commit 9e8fe0d

Browse files
Lucas Bowlerartembilan
Lucas Bowler
authored andcommitted
GH-3822: Reconnection for MQTTv5 channel adapters
Fixes: #3822 * Apply spring-framework code style on modified class * Remove unwanted formatting * Take pull request comments into account * Code and JavaDocs clean up * Improve `Mqttv5BackToBackAutomaticReconnectTests` removing non-related code * Improve `mqtt.adoc` for this new manual reconnection feature **Cherry-pick to `5.5.x`**
1 parent 32960fa commit 9e8fe0d

File tree

5 files changed

+198
-45
lines changed

5 files changed

+198
-45
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
*
6767
* @author Artem Bilan
6868
* @author Mikhail Polivakha
69+
* @author Lucas Bowler
6970
*
7071
* @since 5.5.5
7172
*
@@ -162,28 +163,24 @@ protected void onInit() {
162163
@Override
163164
protected void doStart() {
164165
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
165-
this.topicLock.lock();
166-
String[] topics = getTopic();
167166
try {
168167
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
169-
if (topics.length > 0) {
170-
int[] requestedQos = getQos();
171-
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
172-
String message = "Connected and subscribed to " + Arrays.toString(topics);
173-
logger.debug(message);
174-
if (applicationEventPublisher != null) {
175-
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
176-
}
177-
}
178168
}
179169
catch (MqttException ex) {
180-
if (applicationEventPublisher != null) {
181-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
170+
if (this.connectionOptions.isAutomaticReconnect()) {
171+
try {
172+
this.mqttClient.reconnect();
173+
}
174+
catch (MqttException e) {
175+
logger.error(ex, "MQTT client failed to connect. Never happens.");
176+
}
177+
}
178+
else {
179+
if (applicationEventPublisher != null) {
180+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
181+
}
182+
logger.error(ex, "MQTT client failed to connect.");
182183
}
183-
logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
184-
}
185-
finally {
186-
this.topicLock.unlock();
187184
}
188185
}
189186

@@ -312,7 +309,31 @@ public void deliveryComplete(IMqttToken token) {
312309

313310
@Override
314311
public void connectComplete(boolean reconnect, String serverURI) {
315-
312+
if (!reconnect) {
313+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
314+
String[] topics = getTopic();
315+
this.topicLock.lock();
316+
try {
317+
if (topics.length > 0) {
318+
int[] requestedQos = getQos();
319+
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
320+
String message = "Connected and subscribed to " + Arrays.toString(topics);
321+
logger.debug(message);
322+
if (applicationEventPublisher != null) {
323+
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
324+
}
325+
}
326+
}
327+
catch (MqttException ex) {
328+
if (applicationEventPublisher != null) {
329+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
330+
}
331+
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
332+
}
333+
finally {
334+
this.topicLock.unlock();
335+
}
336+
}
316337
}
317338

318339
@Override

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,13 +50,8 @@
5050
/**
5151
* The {@link AbstractMqttMessageHandler} implementation for MQTT v5.
5252
*
53-
* It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)}
54-
* set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects.
55-
* Otherwise, only the manual restart of this component can handle reconnects, e.g. via
56-
* {@link MqttConnectionFailedEvent} handling on disconnection.
57-
*
58-
*
5953
* @author Artem Bilan
54+
* @author Lucas Bowler
6055
*
6156
* @since 5.5.5
6257
*/
@@ -86,11 +81,6 @@ public Mqttv5PahoMessageHandler(String url, String clientId) {
8681
public Mqttv5PahoMessageHandler(MqttConnectionOptions connectionOptions, String clientId) {
8782
super(obtainServerUrlFromOptions(connectionOptions), clientId);
8883
this.connectionOptions = connectionOptions;
89-
if (!this.connectionOptions.isAutomaticReconnect()) {
90-
logger.warn("It is recommended to set 'automaticReconnect' MQTT client option. " +
91-
"Otherwise the current channel adapter restart should be used explicitly, " +
92-
"e.g. via handling 'MqttConnectionFailedEvent' on client disconnection.");
93-
}
9484
}
9585

9686

@@ -165,11 +155,7 @@ protected void doStart() {
165155
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
166156
}
167157
catch (MqttException ex) {
168-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
169-
if (applicationEventPublisher != null) {
170-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
171-
}
172-
logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'.");
158+
logger.error(ex, "MQTT client failed to connect.");
173159
}
174160
}
175161

@@ -249,11 +235,15 @@ else if (payload instanceof String) {
249235
@Override
250236
protected void publish(String topic, Object mqttMessage, Message<?> message) {
251237
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
238+
long completionTimeout = getCompletionTimeout();
252239
try {
240+
if (!this.mqttClient.isConnected()) {
241+
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
242+
}
253243
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
254244
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
255245
if (!this.async) {
256-
token.waitForCompletion(getCompletionTimeout()); // NOSONAR (sync)
246+
token.waitForCompletion(completionTimeout); // NOSONAR (sync)
257247
}
258248
else if (this.asyncEvents && applicationEventPublisher != null) {
259249
applicationEventPublisher.publishEvent(

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/BackToBackAdapterTests.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public void testSingleTopic() {
105105
adapter.setBeanFactory(mock(BeanFactory.class));
106106
adapter.afterPropertiesSet();
107107
adapter.start();
108-
MqttPahoMessageDrivenChannelAdapter inbound = new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(),
109-
"si-test-in", "mqtt-foo");
108+
MqttPahoMessageDrivenChannelAdapter inbound =
109+
new MqttPahoMessageDrivenChannelAdapter(MosquittoContainerTest.mqttUrl(), "si-test-in", "mqtt-foo");
110110
QueueChannel outputChannel = new QueueChannel();
111111
inbound.setOutputChannel(outputChannel);
112112
inbound.setTaskScheduler(taskScheduler);
@@ -467,14 +467,11 @@ public boolean equals(Object obj) {
467467
}
468468
Foo other = (Foo) obj;
469469
if (this.bar == null) {
470-
if (other.bar != null) {
471-
return false;
472-
}
470+
return other.bar == null;
473471
}
474-
else if (!this.bar.equals(other.bar)) {
475-
return false;
472+
else {
473+
return this.bar.equals(other.bar);
476474
}
477-
return true;
478475
}
479476

480477
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
22+
import java.net.UnknownHostException;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
26+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
27+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
28+
import org.eclipse.paho.mqttv5.common.MqttException;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.beans.factory.annotation.Qualifier;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.context.event.EventListener;
36+
import org.springframework.integration.config.EnableIntegration;
37+
import org.springframework.integration.dsl.IntegrationFlow;
38+
import org.springframework.integration.dsl.IntegrationFlows;
39+
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
40+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
41+
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
42+
import org.springframework.integration.mqtt.support.MqttHeaders;
43+
import org.springframework.integration.support.MessageBuilder;
44+
import org.springframework.messaging.Message;
45+
import org.springframework.messaging.MessageChannel;
46+
import org.springframework.messaging.MessageHandlingException;
47+
import org.springframework.messaging.PollableChannel;
48+
import org.springframework.test.annotation.DirtiesContext;
49+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
50+
51+
52+
/**
53+
* @author Lucas Bowler
54+
* @author Artem Bilan
55+
*
56+
* @since 5.5.13
57+
*/
58+
@SpringJUnitConfig
59+
@DirtiesContext
60+
public class Mqttv5BackToBackAutomaticReconnectTests implements MosquittoContainerTest {
61+
62+
@Autowired
63+
@Qualifier("mqttOutFlow.input")
64+
private MessageChannel mqttOutFlowInput;
65+
66+
@Autowired
67+
private PollableChannel fromMqttChannel;
68+
69+
@Autowired
70+
private MqttConnectionOptions connectionOptions;
71+
72+
@Autowired
73+
Config config;
74+
75+
@Test
76+
public void testReconnectionWhenFirstConnectionFails() throws InterruptedException {
77+
Message<String> testMessage =
78+
MessageBuilder.withPayload("testPayload")
79+
.setHeader(MqttHeaders.TOPIC, "siTest")
80+
.build();
81+
82+
assertThatExceptionOfType(MessageHandlingException.class)
83+
.isThrownBy(() -> this.mqttOutFlowInput.send(testMessage))
84+
.withCauseExactlyInstanceOf(MqttException.class)
85+
.withRootCauseExactlyInstanceOf(UnknownHostException.class);
86+
87+
connectionOptions.setServerURIs(new String[]{ MosquittoContainerTest.mqttUrl() });
88+
89+
assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
90+
91+
this.mqttOutFlowInput.send(testMessage);
92+
93+
Message<?> receive = this.fromMqttChannel.receive(10_000);
94+
95+
assertThat(receive).isNotNull();
96+
}
97+
98+
99+
@Configuration
100+
@EnableIntegration
101+
public static class Config {
102+
103+
CountDownLatch subscribeLatch = new CountDownLatch(1);
104+
105+
@Bean
106+
public MqttConnectionOptions mqttConnectOptions() {
107+
return new MqttConnectionOptionsBuilder()
108+
.serverURI("wss://badMqttUrl")
109+
.automaticReconnect(true)
110+
.connectionTimeout(1)
111+
.build();
112+
}
113+
114+
@Bean
115+
public IntegrationFlow mqttOutFlow() {
116+
Mqttv5PahoMessageHandler messageHandler =
117+
new Mqttv5PahoMessageHandler(mqttConnectOptions(), "mqttv5SIout");
118+
119+
return f -> f.handle(messageHandler);
120+
}
121+
122+
123+
@Bean
124+
public IntegrationFlow mqttInFlow() {
125+
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
126+
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions(), "mqttv5SIin", "siTest");
127+
messageProducer.setPayloadType(String.class);
128+
129+
return IntegrationFlows.from(messageProducer)
130+
.channel(c -> c.queue("fromMqttChannel"))
131+
.get();
132+
}
133+
134+
@EventListener(MqttSubscribedEvent.class)
135+
void mqttEvents() {
136+
this.subscribeLatch.countDown();
137+
}
138+
139+
}
140+
141+
}

src/reference/asciidoc/mqtt.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,10 @@ public IntegrationFlow mqttOutFlow() {
461461

462462
IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageHandler` since its contract is aimed only for the MQTT v3 protocol.
463463

464+
If connection fails on start up or at runtime, the `Mqttv5PahoMessageHandler` tries to reconnect on the next message produced to this handler.
465+
If this manual reconnection fails, the connection is exception is thrown back to the caller.
466+
In this case the standard Spring Integration error handling procedure is applied, including request handler advices, e.g. retry or circuit breaker.
467+
464468
See more information in the `Mqttv5PahoMessageHandler` javadocs and its superclass.
465469

466470
The inbound channel adapter for the MQTT v5 protocol is present as an `Mqttv5PahoMessageDrivenChannelAdapter`.
@@ -497,4 +501,4 @@ IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverte
497501
See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass.
498502

499503
IMPORTANT: It is recommended to have the `MqttConnectionOptions#setAutomaticReconnect(boolean)` set to true to let an internal `IMqttAsyncClient` instance to handle reconnects.
500-
Otherwise, only the manual restart of these channel adapters can handle reconnects, e.g. via `MqttConnectionFailedEvent` handling on disconnection.
504+
Otherwise, only the manual restart of `Mqttv5PahoMessageDrivenChannelAdapter` can handle reconnects, e.g. via `MqttConnectionFailedEvent` handling on disconnection.

0 commit comments

Comments
 (0)