Skip to content

Commit fb6e6c6

Browse files
artembilanspring-builds
authored andcommitted
GH-9597: Fix Mqttv5PahoMessageHandler.publish() for concurrency
Fixes: #9597 Issue link: #9597 When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent, the `Connect already in progress (32110)` exception can be thrown * Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock` (cherry picked from commit 334620f)
1 parent 44d035f commit fb6e6c6

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,15 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
266266
long completionTimeout = getCompletionTimeout();
267267
try {
268268
if (!this.mqttClient.isConnected()) {
269-
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
269+
this.lock.lock();
270+
try {
271+
if (!this.mqttClient.isConnected()) {
272+
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
273+
}
274+
}
275+
finally {
276+
this.lock.unlock();
277+
}
270278
}
271279
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
272280
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();

0 commit comments

Comments
 (0)