Skip to content

Commit 334620f

Browse files
committed
GH-9597: Fix Mqttv5PahoMessageHandler.publish() for concurrency
Fixes: #9597 Issue link: #9597 When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent, the `Connect already in progress (32110)` exception can be thrown * Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock` **Auto-cherry-pick to `6.3.x` & `6.2.x`**
1 parent 9149409 commit 334620f

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,15 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
241241
long completionTimeout = getCompletionTimeout();
242242
try {
243243
if (!this.mqttClient.isConnected()) {
244-
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
244+
this.lock.lock();
245+
try {
246+
if (!this.mqttClient.isConnected()) {
247+
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
248+
}
249+
}
250+
finally {
251+
this.lock.unlock();
252+
}
245253
}
246254
IMqttToken token =
247255
this.mqttClient.publish(topic, (MqttMessage) mqttMessage, null, this.mqttPublishActionListener);

0 commit comments

Comments
 (0)