Skip to content

Commit 452d413

Browse files
author
Lucas Bowler
committed
spring-projectsGH-3822 Allow MQTTv5 reconnection, even when first reconnection fails
1 parent cf6ce96 commit 452d413

File tree

3 files changed

+244
-22
lines changed

3 files changed

+244
-22
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
*
6767
* @author Artem Bilan
6868
* @author Mikhail Polivakha
69+
* @author Lucas Bowler
6970
*
7071
* @since 5.5.5
7172
*
@@ -162,29 +163,22 @@ protected void onInit() {
162163
@Override
163164
protected void doStart() {
164165
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
165-
this.topicLock.lock();
166-
String[] topics = getTopic();
167166
try {
168167
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
169-
if (topics.length > 0) {
170-
int[] requestedQos = getQos();
171-
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
172-
String message = "Connected and subscribed to " + Arrays.toString(topics);
173-
logger.debug(message);
168+
} catch (MqttException ex) {
169+
if (this.connectionOptions.isAutomaticReconnect()) {
170+
try {
171+
this.mqttClient.reconnect();
172+
} catch (MqttException e) {
173+
logger.error(ex, "MQTT client failed to connect. Will retry.");
174+
}
175+
} else {
174176
if (applicationEventPublisher != null) {
175-
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
177+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
176178
}
179+
logger.error(ex, "MQTT client failed to connect.");
177180
}
178181
}
179-
catch (MqttException ex) {
180-
if (applicationEventPublisher != null) {
181-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
182-
}
183-
logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics));
184-
}
185-
finally {
186-
this.topicLock.unlock();
187-
}
188182
}
189183

190184
@Override
@@ -312,7 +306,31 @@ public void deliveryComplete(IMqttToken token) {
312306

313307
@Override
314308
public void connectComplete(boolean reconnect, String serverURI) {
315-
309+
if (!reconnect) {
310+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
311+
String[] topics = getTopic();
312+
this.topicLock.lock();
313+
try {
314+
if (topics.length > 0) {
315+
int[] requestedQos = getQos();
316+
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
317+
String message = "Connected and subscribed to " + Arrays.toString(topics);
318+
logger.debug(message);
319+
if (applicationEventPublisher != null) {
320+
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
321+
}
322+
}
323+
}
324+
catch (MqttException ex) {
325+
if (applicationEventPublisher != null) {
326+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
327+
}
328+
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
329+
}
330+
finally {
331+
this.topicLock.unlock();
332+
}
333+
}
316334
}
317335

318336
@Override

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
*
5858
*
5959
* @author Artem Bilan
60+
* @author Lucas Bowler
6061
*
6162
* @since 5.5.5
6263
*/
@@ -165,11 +166,20 @@ protected void doStart() {
165166
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
166167
}
167168
catch (MqttException ex) {
168-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
169-
if (applicationEventPublisher != null) {
170-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
169+
if (this.connectionOptions.isAutomaticReconnect()) {
170+
try {
171+
this.mqttClient.reconnect();
172+
}
173+
catch (MqttException e) {
174+
logger.error(ex, "MQTT client failed to connect. Will retry.");
175+
}
176+
} else {
177+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
178+
if (applicationEventPublisher != null) {
179+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
180+
}
181+
logger.error(ex, "MQTT client failed to connect.");
171182
}
172-
logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'.");
173183
}
174184
}
175185

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
20+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
21+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
22+
import org.jetbrains.annotations.NotNull;
23+
import org.junit.jupiter.api.Assertions;
24+
import org.junit.jupiter.api.Test;
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.beans.factory.annotation.Qualifier;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.context.event.EventListener;
30+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
31+
import org.springframework.integration.acks.SimpleAcknowledgment;
32+
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.dsl.IntegrationFlow;
34+
import org.springframework.integration.dsl.IntegrationFlows;
35+
import org.springframework.integration.mqtt.event.MqttIntegrationEvent;
36+
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
37+
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
38+
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
39+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
40+
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
41+
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
42+
import org.springframework.integration.mqtt.support.MqttHeaders;
43+
import org.springframework.integration.support.MessageBuilder;
44+
import org.springframework.messaging.Message;
45+
import org.springframework.messaging.MessageChannel;
46+
import org.springframework.messaging.MessageHeaders;
47+
import org.springframework.messaging.PollableChannel;
48+
import org.springframework.messaging.converter.AbstractMessageConverter;
49+
import org.springframework.messaging.converter.SmartMessageConverter;
50+
import org.springframework.test.annotation.DirtiesContext;
51+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
52+
53+
import java.nio.charset.StandardCharsets;
54+
import java.util.ArrayList;
55+
import java.util.List;
56+
57+
import static org.assertj.core.api.Assertions.assertThat;
58+
59+
/**
60+
* @author Lucas Bowler
61+
* @since 5.5.x
62+
*/
63+
@SpringJUnitConfig
64+
@DirtiesContext
65+
public class Mqttv5BackToBackAutomaticReconnectTests implements MosquittoContainerTest {
66+
67+
@Autowired
68+
@Qualifier("mqttOutFlow.input")
69+
private MessageChannel mqttOutFlowInput;
70+
71+
@Autowired
72+
private PollableChannel fromMqttChannel;
73+
74+
@Autowired
75+
private Config config;
76+
77+
@Autowired
78+
private MqttConnectionOptions connectionOptions;
79+
80+
81+
@Test //GH-3822
82+
public void testReconnectionWhenFirstConnectionFails() throws InterruptedException {
83+
connectionOptions.setServerURIs(new String[]{MosquittoContainerTest.mqttUrl()});
84+
Thread.sleep(2_500);
85+
86+
String testPayload = "foo";
87+
88+
this.mqttOutFlowInput.send(
89+
MessageBuilder.withPayload(testPayload)
90+
.setHeader(MqttHeaders.TOPIC, "siTest")
91+
.setHeader("foo", "bar")
92+
.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
93+
.build());
94+
95+
Message<?> receive = this.fromMqttChannel.receive(10_000);
96+
97+
assertThat(receive).isNotNull();
98+
assertThat(receive.getPayload()).isEqualTo(testPayload);
99+
assertThat(receive.getHeaders())
100+
.containsEntry("foo", "bar")
101+
.containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain")
102+
.containsKey(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK);
103+
104+
receive.getHeaders()
105+
.get(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, SimpleAcknowledgment.class)
106+
.acknowledge();
107+
108+
assertThat(this.config.events)
109+
.isNotEmpty()
110+
.hasAtLeastOneElementOfType(MqttMessageSentEvent.class)
111+
.hasAtLeastOneElementOfType(MqttMessageDeliveredEvent.class)
112+
.hasAtLeastOneElementOfType(MqttSubscribedEvent.class);
113+
}
114+
115+
116+
@Configuration
117+
@EnableIntegration
118+
public static class Config {
119+
120+
List<MqttIntegrationEvent> events = new ArrayList<>();
121+
122+
@EventListener
123+
void mqttEvents(MqttIntegrationEvent event) {
124+
this.events.add(event);
125+
}
126+
127+
@Bean
128+
public SmartMessageConverter mqttStringToBytesConverter() {
129+
return new AbstractMessageConverter() {
130+
131+
@Override
132+
protected boolean supports(Class<?> clazz) {
133+
return true;
134+
}
135+
136+
@Override
137+
protected Object convertFromInternal(Message<?> message, Class<?> targetClass,
138+
Object conversionHint) {
139+
140+
return message.getPayload().toString().getBytes(StandardCharsets.UTF_8);
141+
}
142+
143+
@Override
144+
protected Object convertToInternal(Object payload, MessageHeaders headers,
145+
Object conversionHint) {
146+
147+
return new String((byte[]) payload);
148+
}
149+
150+
};
151+
}
152+
153+
154+
@Bean
155+
public MqttConnectionOptions mqttConnectOptions() {
156+
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
157+
builder.serverURI("wss://badMqttUrl");
158+
builder.automaticReconnect(true);
159+
return builder.build();
160+
}
161+
162+
@Bean
163+
public IntegrationFlow mqttOutFlow() {
164+
165+
Mqttv5PahoMessageHandler messageHandler =
166+
new Mqttv5PahoMessageHandler(mqttConnectOptions(), "mqttv5SIout");
167+
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
168+
mqttHeaderMapper.setOutboundHeaderNames("foo", MessageHeaders.CONTENT_TYPE);
169+
messageHandler.setHeaderMapper(mqttHeaderMapper);
170+
messageHandler.setAsync(true);
171+
messageHandler.setAsyncEvents(true);
172+
messageHandler.setConverter(mqttStringToBytesConverter());
173+
174+
return f -> f.handle(messageHandler);
175+
}
176+
177+
178+
@Bean
179+
public IntegrationFlow mqttInFlow() {
180+
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
181+
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions(), "mqttv5SIin", "siTest");
182+
messageProducer.setPayloadType(String.class);
183+
messageProducer.setMessageConverter(mqttStringToBytesConverter());
184+
messageProducer.setManualAcks(true);
185+
186+
return IntegrationFlows.from(messageProducer)
187+
.channel(c -> c.queue("fromMqttChannel"))
188+
.get();
189+
}
190+
191+
192+
}
193+
194+
}

0 commit comments

Comments
 (0)