Skip to content

Commit 12fc353

Browse files
committed
GH-9276: Mitigate ConcurrentModificationException in the Mqttv5PahoMessageDrivenChannelAdapter
Fixes: #9276 The current Eclipse Paho client has wrong removal from map logic which leads to the `ConcurrentModificationException` when we unsubscribe from topic. * Catch a `ConcurrentModificationException` `this.mqttClient.unsubscribe()` and just log it as an `error` **Auto-cherry-pick to `6.3.x` & `6.2.x`**
1 parent 4108ea5 commit 12fc353

File tree

1 file changed

+14
-3
lines changed

1 file changed

+14
-3
lines changed

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.Collections;
22+
import java.util.ConcurrentModificationException;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.locks.Lock;
@@ -289,7 +290,7 @@ protected void doStop() {
289290
try {
290291
if (this.mqttClient != null && this.mqttClient.isConnected()) {
291292
if (this.connectionOptions.isCleanStart()) {
292-
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
293+
unsubscribe(topics);
293294
// Have to re-subscribe on next start if connection is not lost.
294295
this.readyToSubscribeOnStart = true;
295296

@@ -310,12 +311,22 @@ protected void doStop() {
310311
}
311312
}
312313

314+
private void unsubscribe(String... topics) throws MqttException {
315+
try {
316+
// Catch ConcurrentModificationException: https://github.com/eclipse/paho.mqtt.java/issues/986
317+
this.mqttClient.unsubscribe(topics).waitForCompletion(getCompletionTimeout());
318+
}
319+
catch (ConcurrentModificationException ex) {
320+
logger.error(ex, () -> "Error unsubscribing from " + Arrays.toString(topics));
321+
}
322+
}
323+
313324
@Override
314325
public void destroy() {
315326
super.destroy();
316327
try {
317328
if (getClientManager() == null && this.mqttClient != null) {
318-
this.mqttClient.close(true);
329+
this.mqttClient.close();
319330
}
320331
}
321332
catch (MqttException ex) {
@@ -360,7 +371,7 @@ public void removeTopic(String... topic) {
360371
this.topicLock.lock();
361372
try {
362373
if (this.mqttClient != null && this.mqttClient.isConnected()) {
363-
this.mqttClient.unsubscribe(topic).waitForCompletion(getCompletionTimeout());
374+
unsubscribe(topic);
364375
}
365376
super.removeTopic(topic);
366377
if (!CollectionUtils.isEmpty(this.subscriptions)) {

0 commit comments

Comments
 (0)