Skip to content

Commit 308a327

Browse files
author
Lucas Bowler
committed
spring-projectsGH-3822: Take pull request comments into account
1 parent ef66514 commit 308a327

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ protected void doStart() {
172172
this.mqttClient.reconnect();
173173
}
174174
catch (MqttException e) {
175-
logger.error(ex, "MQTT client failed to connect. Will retry.");
175+
logger.error(ex, "MQTT client failed to connect. Never happens.");
176176
}
177177
}
178178
else {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -166,21 +166,7 @@ protected void doStart() {
166166
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
167167
}
168168
catch (MqttException ex) {
169-
if (this.connectionOptions.isAutomaticReconnect()) {
170-
try {
171-
this.mqttClient.reconnect();
172-
}
173-
catch (MqttException e) {
174-
logger.error(ex, "MQTT client failed to connect. Will retry.");
175-
}
176-
}
177-
else {
178-
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
179-
if (applicationEventPublisher != null) {
180-
applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex));
181-
}
182169
logger.error(ex, "MQTT client failed to connect.");
183-
}
184170
}
185171
}
186172

@@ -261,6 +247,9 @@ else if (payload instanceof String) {
261247
protected void publish(String topic, Object mqttMessage, Message<?> message) {
262248
Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'");
263249
try {
250+
if (!this.mqttClient.isConnected()) {
251+
this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout());
252+
}
264253
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
265254
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
266255
if (!this.async) {

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackAutomaticReconnectTests.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.net.UnknownHostException;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.ArrayList;
2324
import java.util.List;
2425

2526
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
2627
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
28+
import org.eclipse.paho.mqttv5.common.MqttException;
29+
import org.junit.jupiter.api.Assertions;
2730
import org.junit.jupiter.api.Test;
2831

2932
import org.springframework.beans.factory.annotation.Autowired;
@@ -47,6 +50,7 @@
4750
import org.springframework.integration.support.MessageBuilder;
4851
import org.springframework.messaging.Message;
4952
import org.springframework.messaging.MessageChannel;
53+
import org.springframework.messaging.MessageHandlingException;
5054
import org.springframework.messaging.MessageHeaders;
5155
import org.springframework.messaging.PollableChannel;
5256
import org.springframework.messaging.converter.AbstractMessageConverter;
@@ -55,7 +59,6 @@
5559
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
5660

5761

58-
5962
/**
6063
* @author Lucas Bowler
6164
* @since 5.5.x
@@ -80,11 +83,22 @@ public class Mqttv5BackToBackAutomaticReconnectTests implements MosquittoContain
8083

8184
@Test //GH-3822
8285
public void testReconnectionWhenFirstConnectionFails() throws InterruptedException {
86+
String testPayload = "foo";
87+
88+
MessageHandlingException messageHandlingException = Assertions.assertThrows(MessageHandlingException.class, () ->
89+
this.mqttOutFlowInput.send(
90+
MessageBuilder.withPayload(testPayload)
91+
.setHeader(MqttHeaders.TOPIC, "siTest")
92+
.setHeader("foo", "bar")
93+
.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
94+
.build())
95+
);
96+
Assertions.assertInstanceOf(MqttException.class, messageHandlingException.getCause());
97+
Assertions.assertInstanceOf(UnknownHostException.class, messageHandlingException.getRootCause());
98+
8399
connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()});
84100
Thread.sleep(2_500);
85101

86-
String testPayload = "foo";
87-
88102
this.mqttOutFlowInput.send(
89103
MessageBuilder.withPayload(testPayload)
90104
.setHeader(MqttHeaders.TOPIC, "siTest")

0 commit comments

Comments
 (0)