Skip to content

Commit 8e27b1c

Browse files
authored
GH-8550: MQTT: Always re-subscribe on re-connect (#8553)
Fixes #8550 Turns out the Paho MQTT client does not re-subscribe when connection re-established on automatic reconnection * Fix `Mqttv5PahoMessageDrivenChannelAdapter` to always subscribe to its topics in the `connectComplete()` independently of the `reconnect` status * Verify behavior with `MOSQUITTO_CONTAINER` image restart in Docker
1 parent b088091 commit 8e27b1c

File tree

2 files changed

+146
-22
lines changed

2 files changed

+146
-22
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,7 +91,7 @@ public class Mqttv5PahoMessageDrivenChannelAdapter extends AbstractMqttMessageDr
9191
public Mqttv5PahoMessageDrivenChannelAdapter(String url, String clientId, String... topic) {
9292
super(url, clientId, topic);
9393
this.connectionOptions = new MqttConnectionOptions();
94-
this.connectionOptions.setServerURIs(new String[]{ url });
94+
this.connectionOptions.setServerURIs(new String[] {url});
9595
this.connectionOptions.setAutomaticReconnect(true);
9696
}
9797

@@ -312,30 +312,28 @@ public void deliveryComplete(IMqttToken token) {
312312

313313
@Override
314314
public void connectComplete(boolean reconnect, String serverURI) {
315-
if (!reconnect) {
316-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
317-
String[] topics = getTopic();
318-
this.topicLock.lock();
319-
try {
320-
if (topics.length > 0) {
321-
int[] requestedQos = getQos();
322-
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
323-
String message = "Connected and subscribed to " + Arrays.toString(topics);
324-
logger.debug(message);
325-
if (applicationEventPublisher != null) {
326-
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
327-
}
328-
}
329-
}
330-
catch (MqttException ex) {
315+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
316+
String[] topics = getTopic();
317+
this.topicLock.lock();
318+
try {
319+
if (topics.length > 0) {
320+
int[] requestedQos = getQos();
321+
this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout());
322+
String message = "Connected and subscribed to " + Arrays.toString(topics);
323+
logger.debug(message);
331324
if (applicationEventPublisher != null) {
332-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
325+
applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message));
333326
}
334-
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
335327
}
336-
finally {
337-
this.topicLock.unlock();
328+
}
329+
catch (MqttException ex) {
330+
if (applicationEventPublisher != null) {
331+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
338332
}
333+
logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics));
334+
}
335+
finally {
336+
this.topicLock.unlock();
339337
}
340338
}
341339

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
23+
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.context.event.EventListener;
30+
import org.springframework.integration.annotation.ServiceActivator;
31+
import org.springframework.integration.channel.QueueChannel;
32+
import org.springframework.integration.config.EnableIntegration;
33+
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
34+
import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter;
35+
import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler;
36+
import org.springframework.messaging.MessageChannel;
37+
import org.springframework.messaging.PollableChannel;
38+
import org.springframework.messaging.support.GenericMessage;
39+
import org.springframework.test.annotation.DirtiesContext;
40+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
41+
42+
import static org.assertj.core.api.Assertions.assertThat;
43+
44+
/**
45+
* @author Artem Bilan
46+
*
47+
* @since 5.5.17
48+
*/
49+
@SpringJUnitConfig
50+
@DirtiesContext
51+
public class ResubscribeAfterAutomaticReconnectTests implements MosquittoContainerTest {
52+
53+
@Autowired
54+
private MessageChannel mqttOutInput;
55+
56+
@Autowired
57+
private PollableChannel fromMqttChannel;
58+
59+
@Autowired
60+
private MqttConnectionOptions connectionOptions;
61+
62+
@Autowired
63+
Config config;
64+
65+
@Test
66+
void messageReceivedAfterResubscriptionOnLostConnection() throws InterruptedException {
67+
GenericMessage<String> testMessage = new GenericMessage<>("test");
68+
this.mqttOutInput.send(testMessage);
69+
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
70+
71+
MOSQUITTO_CONTAINER.stop();
72+
MOSQUITTO_CONTAINER.start();
73+
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
74+
75+
assertThat(this.config.subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
76+
77+
this.mqttOutInput.send(testMessage);
78+
assertThat(this.fromMqttChannel.receive(10_000)).isNotNull();
79+
}
80+
81+
@Configuration
82+
@EnableIntegration
83+
public static class Config {
84+
85+
CountDownLatch subscribeLatch = new CountDownLatch(2);
86+
87+
@Bean
88+
public MqttConnectionOptions mqttConnectOptions() {
89+
return new MqttConnectionOptionsBuilder()
90+
.serverURI(MosquittoContainerTest.mqttUrl())
91+
.automaticReconnect(true)
92+
.build();
93+
}
94+
95+
@Bean
96+
@ServiceActivator(inputChannel = "mqttOutInput")
97+
public Mqttv5PahoMessageHandler mqttOutHandler(MqttConnectionOptions mqttConnectOptions) {
98+
Mqttv5PahoMessageHandler messageHandler =
99+
new Mqttv5PahoMessageHandler(mqttConnectOptions, "mqttv5SIout");
100+
messageHandler.setDefaultTopic("siTest");
101+
return messageHandler;
102+
}
103+
104+
@Bean
105+
QueueChannel fromMqttChannel() {
106+
return new QueueChannel();
107+
}
108+
109+
@Bean
110+
public Mqttv5PahoMessageDrivenChannelAdapter mqttInChannelAdapter(
111+
MqttConnectionOptions mqttConnectOptions, QueueChannel fromMqttChannel) {
112+
113+
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
114+
new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions, "mqttInClient", "siTest");
115+
messageProducer.setOutputChannel(fromMqttChannel);
116+
return messageProducer;
117+
}
118+
119+
@EventListener(MqttSubscribedEvent.class)
120+
public void mqttEvents() {
121+
this.subscribeLatch.countDown();
122+
}
123+
124+
}
125+
126+
}

0 commit comments

Comments
 (0)