Skip to content

Commit 8c57cb7

Browse files
artembilangaryrussell
authored andcommitted
GH-3697: Various lifecycle fixed for MQTT v5 CAs
Fixes #3697 SO: https://stackoverflow.com/questions/70374046/spring-integration-mqtt-failed-to-start-app-when-the-network-is-disconnected * Add `mqttClient.disconnect()` to `Mqttv5PahoMessageDrivenChannelAdapter.doStop()` - the `doStart()` does `connect()` * Add `Mqttv5PahoMessageDrivenChannelAdapter.destroy()` impl to close `mqttClient` * Fix `Mqttv5PahoMessageHandler.doStart()` to not re-throw an exception on connection. Emit an `MqttConnectionFailedEvent` and log error instead * Fix `Mqttv5PahoMessageHandler.destroy()` to call `mqttClient.close(true)` for better resources clean up * Improve MQTT v5 components Javadocs and add a reconnect note into `mqtt.adoc`
1 parent 30bb846 commit 8c57cb7

File tree

4 files changed

+31
-7
lines changed

4 files changed

+31
-7
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
*
6060
* It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)}
6161
* set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects.
62-
* Otherwise, the manual restart of this component can only handle reconnects, e.g. via
62+
* Otherwise, only the manual restart of this component can handle reconnects, e.g. via
6363
* {@link MqttConnectionFailedEvent} handling on disconnection.
6464
*
6565
* See {@link #setPayloadType} for more information about type conversion.
@@ -190,6 +190,7 @@ protected void doStop() {
190190
String[] topics = getTopic();
191191
try {
192192
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
193+
this.mqttClient.disconnect().waitForCompletion(getCompletionTimeout());
193194
}
194195
catch (MqttException ex) {
195196
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
@@ -199,6 +200,17 @@ protected void doStop() {
199200
}
200201
}
201202

203+
@Override
204+
public void destroy() {
205+
super.destroy();
206+
try {
207+
this.mqttClient.close(true);
208+
}
209+
catch (MqttException ex) {
210+
logger.error(ex, "Failed to close 'MqttAsyncClient'");
211+
}
212+
}
213+
202214
@Override
203215
public void addTopic(String topic, int qos) {
204216
this.topicLock.lock();

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@
5050
/**
5151
* The {@link AbstractMqttMessageHandler} implementation for MQTT v5.
5252
*
53+
* It is recommended to have the {@link MqttConnectionOptions#setAutomaticReconnect(boolean)}
54+
* set to true to let an internal {@link IMqttAsyncClient} instance to handle reconnects.
55+
* Otherwise, only the manual restart of this component can handle reconnects, e.g. via
56+
* {@link MqttConnectionFailedEvent} handling on disconnection.
57+
*
58+
*
5359
* @author Artem Bilan
5460
*
5561
* @since 5.5.5
@@ -159,7 +165,11 @@ protected void doStart() {
159165
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
160166
}
161167
catch (MqttException ex) {
162-
throw new IllegalStateException("Cannot connect 'MqttAsyncClient' for: " + getComponentName(), ex);
168+
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
169+
if (applicationEventPublisher != null) {
170+
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
171+
}
172+
logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'.");
163173
}
164174
}
165175

@@ -177,7 +187,7 @@ protected void doStop() {
177187
public void destroy() {
178188
super.destroy();
179189
try {
180-
this.mqttClient.close();
190+
this.mqttClient.close(true);
181191
}
182192
catch (MqttException ex) {
183193
logger.error(ex, "Failed to close 'MqttAsyncClient'");

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
* @author Gary Russell
5757
* @author Artem Bilan
5858
*
59-
* @since 4.0
59+
* @since 5.5.5
6060
*
6161
*/
6262
@SpringJUnitConfig
@@ -164,8 +164,7 @@ public IntegrationFlow mqttInFlow() {
164164
messageProducer.setMessageConverter(mqttStringToBytesConverter());
165165
messageProducer.setManualAcks(true);
166166

167-
return IntegrationFlows.from(
168-
messageProducer)
167+
return IntegrationFlows.from(messageProducer)
169168
.channel(c -> c.queue("fromMqttChannel"))
170169
.get();
171170
}

src/reference/asciidoc/mqtt.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -494,4 +494,7 @@ public IntegrationFlow mqttInFlow() {
494494

495495
IMPORTANT: The `org.springframework.integration.mqtt.support.MqttMessageConverter` cannot be used with the `Mqttv5PahoMessageDrivenChannelAdapter` since its contract is aimed only for the MQTT v3 protocol.
496496

497-
See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass.
497+
See more information in the `Mqttv5PahoMessageDrivenChannelAdapter` javadocs and its superclass.
498+
499+
IMPORTANT: It is recommended to have the `MqttConnectionOptions#setAutomaticReconnect(boolean)` set to true to let an internal `IMqttAsyncClient` instance to handle reconnects.
500+
Otherwise, only the manual restart of these channel adapters can handle reconnects, e.g. via `MqttConnectionFailedEvent` handling on disconnection.

0 commit comments

Comments
 (0)