Skip to content

Closing a channel should cleanup recorded auto-delete queues #669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.AMQCommand;
import com.rabbitmq.client.impl.recovery.Utils.IoTimeoutExceptionRunnable;
import com.rabbitmq.utility.Utility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,11 +40,11 @@ public class AutorecoveringChannel implements RecoverableChannel {

private volatile RecoveryAwareChannelN delegate;
private volatile AutorecoveringConnection connection;
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<ShutdownListener>();
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<RecoveryListener>();
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<ReturnListener>();
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<ConfirmListener>();
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());
private final List<ShutdownListener> shutdownHooks = new CopyOnWriteArrayList<>();
private final List<RecoveryListener> recoveryListeners = new CopyOnWriteArrayList<>();
private final List<ReturnListener> returnListeners = new CopyOnWriteArrayList<>();
private final List<ConfirmListener> confirmListeners = new CopyOnWriteArrayList<>();
private final Set<String> consumerTags = Collections.synchronizedSet(new HashSet<>());
private int prefetchCountConsumer;
private int prefetchCountGlobal;
private boolean usesPublisherConfirms;
Expand Down Expand Up @@ -100,8 +101,8 @@ private void executeAndClean(IoTimeoutExceptionRunnable callback) throws IOExcep
try {
callback.run();
} finally {
for (String consumerTag : consumerTags) {
this.connection.deleteRecordedConsumer(consumerTag);
for (String consumerTag : Utility.copy(consumerTags)) {
this.deleteRecordedConsumer(consumerTag);
}
this.connection.unregisterChannel(this);
}
Expand Down Expand Up @@ -644,10 +645,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp

@Override
public void basicCancel(String consumerTag) throws IOException {
RecordedConsumer c = this.deleteRecordedConsumer(consumerTag);
if(c != null) {
this.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
}
this.deleteRecordedConsumer(consumerTag);
delegate.basicCancel(consumerTag);
}

Expand Down Expand Up @@ -902,13 +900,12 @@ private void recordConsumer(String result,
this.connection.recordConsumer(result, consumer);
}

private RecordedConsumer deleteRecordedConsumer(String consumerTag) {
private void deleteRecordedConsumer(String consumerTag) {
this.consumerTags.remove(consumerTag);
return this.connection.deleteRecordedConsumer(consumerTag);
}

private void maybeDeleteRecordedAutoDeleteQueue(String queue) {
this.connection.maybeDeleteRecordedAutoDeleteQueue(queue);
RecordedConsumer c = this.connection.deleteRecordedConsumer(consumerTag);
if (c != null) {
this.connection.maybeDeleteRecordedAutoDeleteQueue(c.getQueue());
}
}

private void maybeDeleteRecordedAutoDeleteExchange(String exchange) {
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/com/rabbitmq/utility/Utility.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Catch-all holder class for static helper methods.
Expand Down Expand Up @@ -73,6 +75,21 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
return throwable;
}

/**
* Synchronizes on the set and then returns a copy of the set that is safe to iterate over. Useful when wanting to do thread-safe iteration over
* a Set wrapped in {@link Collections#synchronizedSet(Set)}.
*
* @param set
* The set, which may not be {@code null}
* @return LinkedHashSet copy of the set
*/
public static <E> Set<E> copy(final Set<E> set) {
// No Sonar: this very list instance can be synchronized in other places of its owning class
synchronized (set) { //NOSONAR
return new LinkedHashSet<>(set);
}
}

/**
* Synchronizes on the list and then returns a copy of the list that is safe to iterate over. Useful when wanting to do thread-safe iteration over
* a List wrapped in {@link Collections#synchronizedList(List)}.
Expand All @@ -84,7 +101,7 @@ public static <T extends Throwable & SensibleClone<T>> T fixStackTrace(T throwab
public static <E> List<E> copy(final List<E> list) {
// No Sonar: this very list instance can be synchronized in other places of its owning class
synchronized (list) { //NOSONAR
return new ArrayList<E>(list);
return new ArrayList<>(list);
}
}

Expand All @@ -99,7 +116,7 @@ public static <E> List<E> copy(final List<E> list) {
public static <K, V> Map<K, V> copy(final Map<K, V> map) {
// No Sonar: this very map instance can be synchronized in other places of its owning class
synchronized (map) { //NOSONAR
return new LinkedHashMap<K, V>(map);
return new LinkedHashMap<>(map);
}
}
}