diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 545b26c3ed..52e100ad57 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -18,6 +18,7 @@ import com.rabbitmq.client.*; import com.rabbitmq.client.impl.AMQCommand; import com.rabbitmq.client.impl.recovery.Utils.IoTimeoutExceptionRunnable; +import com.rabbitmq.utility.Utility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +40,11 @@ public class AutorecoveringChannel implements RecoverableChannel { private volatile RecoveryAwareChannelN delegate; private volatile AutorecoveringConnection connection; - private final List shutdownHooks = new CopyOnWriteArrayList(); - private final List recoveryListeners = new CopyOnWriteArrayList(); - private final List returnListeners = new CopyOnWriteArrayList(); - private final List confirmListeners = new CopyOnWriteArrayList(); - private final Set consumerTags = Collections.synchronizedSet(new HashSet()); + private final List shutdownHooks = new CopyOnWriteArrayList<>(); + private final List recoveryListeners = new CopyOnWriteArrayList<>(); + private final List returnListeners = new CopyOnWriteArrayList<>(); + private final List confirmListeners = new CopyOnWriteArrayList<>(); + private final Set consumerTags = Collections.synchronizedSet(new HashSet<>()); private int prefetchCountConsumer; private int prefetchCountGlobal; private boolean usesPublisherConfirms; @@ -100,8 +101,8 @@ private void executeAndClean(IoTimeoutExceptionRunnable callback) throws IOExcep try { callback.run(); } finally { - for (String consumerTag : consumerTags) { - this.connection.deleteRecordedConsumer(consumerTag); + for (String consumerTag : Utility.copy(consumerTags)) { + this.deleteRecordedConsumer(consumerTag); } this.connection.unregisterChannel(this); } @@ -644,10 +645,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp @Override public void basicCancel(String consumerTag) throws IOException { - RecordedConsumer c = this.deleteRecordedConsumer(consumerTag); - if(c != null) { - this.maybeDeleteRecordedAutoDeleteQueue(c.getQueue()); - } + this.deleteRecordedConsumer(consumerTag); delegate.basicCancel(consumerTag); } @@ -902,13 +900,12 @@ private void recordConsumer(String result, this.connection.recordConsumer(result, consumer); } - private RecordedConsumer deleteRecordedConsumer(String consumerTag) { + private void deleteRecordedConsumer(String consumerTag) { this.consumerTags.remove(consumerTag); - return this.connection.deleteRecordedConsumer(consumerTag); - } - - private void maybeDeleteRecordedAutoDeleteQueue(String queue) { - this.connection.maybeDeleteRecordedAutoDeleteQueue(queue); + RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag); + if (c != null) { + this.connection.maybeDeleteRecordedAutoDeleteQueue(c.getQueue()); + } } private void maybeDeleteRecordedAutoDeleteExchange(String exchange) { diff --git a/src/main/java/com/rabbitmq/utility/Utility.java b/src/main/java/com/rabbitmq/utility/Utility.java index f8cdea8656..db709a7ff8 100644 --- a/src/main/java/com/rabbitmq/utility/Utility.java +++ b/src/main/java/com/rabbitmq/utility/Utility.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; /** * Catch-all holder class for static helper methods. @@ -73,6 +75,21 @@ public static > T fixStackTrace(T throwab return throwable; } + /** + * Synchronizes on the set and then returns a copy of the set that is safe to iterate over. Useful when wanting to do thread-safe iteration over + * a Set wrapped in {@link Collections#synchronizedSet(Set)}. + * + * @param set + * The set, which may not be {@code null} + * @return LinkedHashSet copy of the set + */ + public static Set copy(final Set set) { + // No Sonar: this very list instance can be synchronized in other places of its owning class + synchronized (set) { //NOSONAR + return new LinkedHashSet<>(set); + } + } + /** * Synchronizes on the list and then returns a copy of the list that is safe to iterate over. Useful when wanting to do thread-safe iteration over * a List wrapped in {@link Collections#synchronizedList(List)}. @@ -84,7 +101,7 @@ public static > T fixStackTrace(T throwab public static List copy(final List list) { // No Sonar: this very list instance can be synchronized in other places of its owning class synchronized (list) { //NOSONAR - return new ArrayList(list); + return new ArrayList<>(list); } } @@ -99,7 +116,7 @@ public static List copy(final List list) { public static Map copy(final Map map) { // No Sonar: this very map instance can be synchronized in other places of its owning class synchronized (map) { //NOSONAR - return new LinkedHashMap(map); + return new LinkedHashMap<>(map); } } }