Skip to content

Commit b451798

Browse files
committed
Add test for recorded binding clean-up after exchange deletion
1 parent aaff1cc commit b451798

File tree

2 files changed

+25
-12
lines changed

2 files changed

+25
-12
lines changed

src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

+6-12
Original file line numberDiff line numberDiff line change
@@ -1164,25 +1164,19 @@ boolean hasMoreConsumersOnQueue(Collection<RecordedConsumer> consumers, String q
11641164
}
11651165

11661166
Set<RecordedBinding> removeBindingsWithDestination(String s) {
1167-
final Set<RecordedBinding> result = new LinkedHashSet<>();
1168-
synchronized (this.recordedBindings) {
1169-
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
1170-
RecordedBinding b = it.next();
1171-
if(b.getDestination().equals(s)) {
1172-
it.remove();
1173-
result.add(b);
1174-
}
1175-
}
1176-
}
1177-
return result;
1167+
return this.removeBindingsWithCondition(b -> b.getSource().equals(s));
11781168
}
11791169

11801170
Set<RecordedBinding> removeBindingsWithSource(String s) {
1171+
return this.removeBindingsWithCondition(b -> b.getSource().equals(s));
1172+
}
1173+
1174+
private Set<RecordedBinding> removeBindingsWithCondition(Predicate<RecordedBinding> condition) {
11811175
final Set<RecordedBinding> result = new LinkedHashSet<>();
11821176
synchronized (this.recordedBindings) {
11831177
for (Iterator<RecordedBinding> it = this.recordedBindings.iterator(); it.hasNext(); ) {
11841178
RecordedBinding b = it.next();
1185-
if (b.getSource().equals(s)) {
1179+
if (condition.test(b)) {
11861180
it.remove();
11871181
result.add(b);
11881182
}

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

+19
Original file line numberDiff line numberDiff line change
@@ -865,6 +865,21 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
865865
}
866866
}
867867

868+
@Test public void thatBindingFromDeletedExchangeIsDeleted() throws IOException, InterruptedException {
869+
String q = generateQueueName();
870+
channel.queueDeclare(q, false, false, false, null);
871+
try {
872+
String x = generateExchangeName();
873+
channel.exchangeDeclare(x, "fanout");
874+
channel.queueBind(q, x, "");
875+
assertRecordedBinding(connection, 1);
876+
channel.exchangeDelete(x);
877+
assertRecordedBinding(connection, 0);
878+
} finally {
879+
channel.queueDelete(q);
880+
}
881+
}
882+
868883
private void assertConsumerCount(int exp, String q) throws IOException {
869884
assertThat(channel.queueDeclarePassive(q).getConsumerCount()).isEqualTo(exp);
870885
}
@@ -1017,4 +1032,8 @@ private static void assertRecordedQueues(Connection conn, int size) {
10171032
private static void assertRecordedExchanges(Connection conn, int size) {
10181033
assertThat(((AutorecoveringConnection)conn).getRecordedExchanges()).hasSize(size);
10191034
}
1035+
1036+
private static void assertRecordedBinding(Connection conn, int size) {
1037+
assertThat(((AutorecoveringConnection)conn).getRecordedBindings()).hasSize(size);
1038+
}
10201039
}

0 commit comments

Comments
 (0)