Skip to content

Commit 0f96549

Browse files
artembilanspring-builds
authored andcommitted
GH-9597: Fix Mqttv5PahoMessageHandler.publish() for concurrency
Fixes: #9597 Issue link: #9597 When input channel for the endpoint with a `Mqttv5PahoMessageHandler` is an `ExecutorChannel` and a lot of concurrent messages are sent, the `Connect already in progress (32110)` exception can be thrown * Wrap `this.mqttClient.connect()` in the `Mqttv5PahoMessageHandler.publish()` into a `Lock` (cherry picked from commit 334620f)
1 parent 7170f27 commit 0f96549

File tree

1 file changed

+9
-1
lines changed

1 file changed

+9
-1
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,15 @@ protected void publish(String topic, Object mqttMessage, Message<?> message) {
267267
long completionTimeout = getCompletionTimeout();
268268
try {
269269
if (!this.mqttClient.isConnected()) {
270-
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
270+
this.lock.lock();
271+
try {
272+
if (!this.mqttClient.isConnected()) {
273+
this.mqttClient.connect(this.connectionOptions).waitForCompletion(completionTimeout);
274+
}
275+
}
276+
finally {
277+
this.lock.unlock();
278+
}
271279
}
272280
IMqttToken token = this.mqttClient.publish(topic, (MqttMessage) mqttMessage);
273281
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();

0 commit comments

Comments
 (0)