Skip to content

Commit ee8af1a

Browse files
authored
Merge pull request #669 from vikinghawk/master
Closing a channel should cleanup recorded auto-delete queues
2 parents ecf948e + 9cf00f4 commit ee8af1a

File tree

2 files changed

+33
-19
lines changed

2 files changed

+33
-19
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.rabbitmq.client.*;
1919
import com.rabbitmq.client.impl.AMQCommand;
2020
import com.rabbitmq.client.impl.recovery.Utils.IoTimeoutExceptionRunnable;
21+
import com.rabbitmq.utility.Utility;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

@@ -39,11 +40,11 @@ public class AutorecoveringChannel implements RecoverableChannel {
3940

4041
private volatile RecoveryAwareChannelN delegate;
4142
private volatile AutorecoveringConnection connection;
42-
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
43-
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>();
44-
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
45-
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
46-
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());
43+
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<>();
44+
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<>();
45+
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<>();
46+
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<>();
47+
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<>());
4748
private int prefetchCountConsumer;
4849
private int prefetchCountGlobal;
4950
private boolean usesPublisherConfirms;
@@ -100,8 +101,8 @@ private void executeAndClean(IoTimeoutExceptionRunnable callback) throws IOExcep
100101
try {
101102
callback.run();
102103
} finally {
103-
for (String consumerTag : consumerTags) {
104-
this.connection.deleteRecordedConsumer(consumerTag);
104+
for (String consumerTag : Utility.copy(consumerTags)) {
105+
this.deleteRecordedConsumer(consumerTag);
105106
}
106107
this.connection.unregisterChannel(this);
107108
}
@@ -644,10 +645,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
644645

645646
@Override
646647
public void basicCancel(String consumerTag) throws IOException {
647-
RecordedConsumer c = this.deleteRecordedConsumer(consumerTag);
648-
if(c != null) {
649-
this.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
650-
}
648+
this.deleteRecordedConsumer(consumerTag);
651649
delegate.basicCancel(consumerTag);
652650
}
653651

@@ -902,13 +900,12 @@ private void recordConsumer(String result,
902900
this.connection.recordConsumer(result, consumer);
903901
}
904902

905-
private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
903+
private void deleteRecordedConsumer(String consumerTag) {
906904
this.consumerTags.remove(consumerTag);
907-
return this.connection.deleteRecordedConsumer(consumerTag);
908-
}
909-
910-
private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
911-
this.connection.maybeDeleteRecordedAutoDeleteQueue(queue);
905+
RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag);
906+
if (c != null) {
907+
this.connection.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
908+
}
912909
}
913910

914911
private void maybeDeleteRecordedAutoDeleteExchange(String exchange) {

src/main/java/com/rabbitmq/utility/Utility.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import java.util.ArrayList;
1919
import java.util.Collections;
2020
import java.util.LinkedHashMap;
21+
import java.util.LinkedHashSet;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Set;
2325

2426
/**
2527
* Catch-all holder class for static helper methods.
@@ -73,6 +75,21 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
7375
return throwable;
7476
}
7577

78+
/**
79+
* Synchronizes on the set and then returns a copy of the set that is safe to iterate over. Useful when wanting to do thread-safe iteration over
80+
* a Set wrapped in {@link Collections#synchronizedSet(Set)}.
81+
*
82+
* @param set
83+
* The set, which may not be {@code null}
84+
* @return LinkedHashSet copy of the set
85+
*/
86+
public static <E> Set<E> copy(final Set<E> set) {
87+
// No Sonar: this very list instance can be synchronized in other places of its owning class
88+
synchronized (set) { //NOSONAR
89+
return new LinkedHashSet<>(set);
90+
}
91+
}
92+
7693
/**
7794
* Synchronizes on the list and then returns a copy of the list that is safe to iterate over. Useful when wanting to do thread-safe iteration over
7895
* a List wrapped in {@link Collections#synchronizedList(List)}.
@@ -84,7 +101,7 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
84101
public static <E> List<E> copy(final List<E> list) {
85102
// No Sonar: this very list instance can be synchronized in other places of its owning class
86103
synchronized (list) { //NOSONAR
87-
return new ArrayList<E>(list);
104+
return new ArrayList<>(list);
88105
}
89106
}
90107

@@ -99,7 +116,7 @@ public static <E> List<E> copy(final List<E> list) {
99116
public static <K, V> Map<K, V> copy(final Map<K, V> map) {
100117
// No Sonar: this very map instance can be synchronized in other places of its owning class
101118
synchronized (map) { //NOSONAR
102-
return new LinkedHashMap<K, V>(map);
119+
return new LinkedHashMap<>(map);
103120
}
104121
}
105122
}

0 commit comments

Comments
 (0)