diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 84fd94b709b..aaab5792f0d 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -66,6 +66,7 @@ * * @author Artem Bilan * @author Mikhail Polivakha + * @author Lucas Bowler * * @since 5.5.5 * @@ -162,28 +163,24 @@ protected void onInit() { @Override protected void doStart() { ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - this.topicLock.lock(); - String[] topics = getTopic(); try { this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); - if (topics.length > 0) { - int[] requestedQos = getQos(); - this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); - String message = "Connected and subscribed to " + Arrays.toString(topics); - logger.debug(message); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); - } - } } catch (MqttException ex) { - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + if (this.connectionOptions.isAutomaticReconnect()) { + try { + this.mqttClient.reconnect(); + } + catch (MqttException e) { + logger.error(ex, "MQTT client failed to connect. Never happens."); + } + } + else { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + } + logger.error(ex, "MQTT client failed to connect."); } - logger.error(ex, () -> "Error connecting or subscribing to " + Arrays.toString(topics)); - } - finally { - this.topicLock.unlock(); } } @@ -312,7 +309,31 @@ public void deliveryComplete(IMqttToken token) { @Override public void connectComplete(boolean reconnect, String serverURI) { - + if (!reconnect) { + ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); + String[] topics = getTopic(); + this.topicLock.lock(); + try { + if (topics.length > 0) { + int[] requestedQos = getQos(); + this.mqttClient.subscribe(topics, requestedQos).waitForCompletion(getCompletionTimeout()); + String message = "Connected and subscribed to " + Arrays.toString(topics); + logger.debug(message); + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttSubscribedEvent(this, message)); + } + } + } + catch (MqttException ex) { + if (applicationEventPublisher != null) { + applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); + } + logger.error(ex, () -> "Error subscribing to " + Arrays.toString(topics)); + } + finally { + this.topicLock.unlock(); + } + } } @Override diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java index 10318d98c50..1b5a0430369 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,6 +57,7 @@ * * * @author Artem Bilan + * @author Lucas Bowler * * @since 5.5.5 */ @@ -86,11 +87,6 @@ public Mqttv5PahoMessageHandler(String url, String clientId) { public Mqttv5PahoMessageHandler(MqttConnectionOptions connectionOptions, String clientId) { super(obtainServerUrlFromOptions(connectionOptions), clientId); this.connectionOptions = connectionOptions; - if (!this.connectionOptions.isAutomaticReconnect()) { - logger.warn("It is recommended to set 'automaticReconnect' MQTT client option. " + - "Otherwise the current channel adapter restart should be used explicitly, " + - "e.g. via handling 'MqttConnectionFailedEvent' on client disconnection."); - } } @@ -165,11 +161,7 @@ protected void doStart() { this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); } catch (MqttException ex) { - ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); - if (applicationEventPublisher != null) { - applicationEventPublisher.publishEvent(new MqttConnectionFailedEvent(this, ex)); - } - logger.error(ex, "MQTT client failed to connect. Will retry if 'ConnectionOptions.isAutomaticReconnect()'."); + logger.error(ex, "MQTT client failed to connect."); } } @@ -250,6 +242,9 @@ else if (payload instanceof String) { protected void publish(String topic, Object mqttMessage, Message message) { Assert.isInstanceOf(MqttMessage.class, mqttMessage, "The 'mqttMessage' must be an instance of 'MqttMessage'"); try { + if (!this.mqttClient.isConnected()) { + this.mqttClient.connect(this.connectionOptions).waitForCompletion(getCompletionTimeout()); + } IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage); ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher(); if (!this.async) { diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackAutomaticReconnectTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackAutomaticReconnectTests.java new file mode 100644 index 00000000000..71b1a9f1d80 --- /dev/null +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5BackToBackAutomaticReconnectTests.java @@ -0,0 +1,207 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.mqtt; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; +import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder; +import org.eclipse.paho.mqttv5.common.MqttException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; +import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.acks.SimpleAcknowledgment; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.mqtt.event.MqttIntegrationEvent; +import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent; +import org.springframework.integration.mqtt.event.MqttMessageSentEvent; +import org.springframework.integration.mqtt.event.MqttSubscribedEvent; +import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.outbound.Mqttv5PahoMessageHandler; +import org.springframework.integration.mqtt.support.MqttHeaderMapper; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandlingException; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.converter.AbstractMessageConverter; +import org.springframework.messaging.converter.SmartMessageConverter; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + + +/** + * @author Lucas Bowler + * @since 5.5.13 + */ +@SpringJUnitConfig +@DirtiesContext +public class Mqttv5BackToBackAutomaticReconnectTests implements MosquittoContainerTest { + + @Autowired + @Qualifier("mqttOutFlow.input") + private MessageChannel mqttOutFlowInput; + + @Autowired + private PollableChannel fromMqttChannel; + + @Autowired + private Config config; + + @Autowired + private MqttConnectionOptions connectionOptions; + + + @Test //GH-3822 + public void testReconnectionWhenFirstConnectionFails() throws InterruptedException { + String testPayload = "foo"; + + MessageHandlingException messageHandlingException = Assertions.assertThrows(MessageHandlingException.class, () -> + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "siTest") + .setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .build()) + ); + Assertions.assertInstanceOf(MqttException.class, messageHandlingException.getCause()); + Assertions.assertInstanceOf(UnknownHostException.class, messageHandlingException.getRootCause()); + + connectionOptions.setServerURIs(new String[] {MosquittoContainerTest.mqttUrl()}); + Thread.sleep(2_500); + + this.mqttOutFlowInput.send( + MessageBuilder.withPayload(testPayload) + .setHeader(MqttHeaders.TOPIC, "siTest") + .setHeader("foo", "bar") + .setHeader(MessageHeaders.CONTENT_TYPE, "text/plain") + .build()); + + Message receive = this.fromMqttChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isEqualTo(testPayload); + assertThat(receive.getHeaders()) + .containsEntry("foo", "bar") + .containsEntry(MessageHeaders.CONTENT_TYPE, "text/plain") + .containsKey(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK); + + receive.getHeaders() + .get(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, SimpleAcknowledgment.class) + .acknowledge(); + + assertThat(this.config.events) + .isNotEmpty() + .hasAtLeastOneElementOfType(MqttMessageSentEvent.class) + .hasAtLeastOneElementOfType(MqttMessageDeliveredEvent.class) + .hasAtLeastOneElementOfType(MqttSubscribedEvent.class); + } + + + @Configuration + @EnableIntegration + public static class Config { + + List events = new ArrayList<>(); + + @EventListener + void mqttEvents(MqttIntegrationEvent event) { + this.events.add(event); + } + + @Bean + public SmartMessageConverter mqttStringToBytesConverter() { + return new AbstractMessageConverter() { + + @Override + protected boolean supports(Class clazz) { + return true; + } + + @Override + protected Object convertFromInternal(Message message, Class targetClass, + Object conversionHint) { + + return message.getPayload().toString().getBytes(StandardCharsets.UTF_8); + } + + @Override + protected Object convertToInternal(Object payload, MessageHeaders headers, + Object conversionHint) { + + return new String((byte[]) payload); + } + + }; + } + + + @Bean + public MqttConnectionOptions mqttConnectOptions() { + MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder(); + builder.serverURI("wss://badMqttUrl"); + builder.automaticReconnect(true); + return builder.build(); + } + + @Bean + public IntegrationFlow mqttOutFlow() { + + Mqttv5PahoMessageHandler messageHandler = + new Mqttv5PahoMessageHandler(mqttConnectOptions(), "mqttv5SIout"); + MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper(); + mqttHeaderMapper.setOutboundHeaderNames("foo", MessageHeaders.CONTENT_TYPE); + messageHandler.setHeaderMapper(mqttHeaderMapper); + messageHandler.setAsync(true); + messageHandler.setAsyncEvents(true); + messageHandler.setConverter(mqttStringToBytesConverter()); + + return f -> f.handle(messageHandler); + } + + + @Bean + public IntegrationFlow mqttInFlow() { + Mqttv5PahoMessageDrivenChannelAdapter messageProducer = + new Mqttv5PahoMessageDrivenChannelAdapter(mqttConnectOptions(), "mqttv5SIin", "siTest"); + messageProducer.setPayloadType(String.class); + messageProducer.setMessageConverter(mqttStringToBytesConverter()); + messageProducer.setManualAcks(true); + + return IntegrationFlows.from(messageProducer) + .channel(c -> c.queue("fromMqttChannel")) + .get(); + } + + } + +}