Skip to content

Commit eada615

Browse files
committed
Add SAC connection closing test
References rabbitmq/rabbitmq-server#3753
1 parent d12b400 commit eada615

File tree

1 file changed

+25
-0
lines changed

1 file changed

+25
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,29 @@ void consumerUpdateListenerShouldBeCalledInOrder() throws Exception {
150150
response = client.unsubscribe(b(9));
151151
assertThat(response.isOk()).isTrue();
152152
}
153+
154+
@Test
155+
void noConsumerUpdateOnConnectionClosingIfSubscriptionNotUnsubscribed() throws Exception {
156+
AtomicInteger consumerUpdateCount = new AtomicInteger(0);
157+
Client client =
158+
cf.get(
159+
new ClientParameters()
160+
.consumerUpdateListener(
161+
(client1, subscriptionId, active) -> {
162+
consumerUpdateCount.incrementAndGet();
163+
return null;
164+
}));
165+
String consumerName = "foo";
166+
Map<String, String> parameters = new HashMap<>();
167+
parameters.put("single-active-consumer", "true");
168+
parameters.put("name", consumerName);
169+
Response response = client.subscribe(b(0), stream, OffsetSpecification.first(), 2, parameters);
170+
assertThat(response.isOk()).isTrue();
171+
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 2, parameters);
172+
assertThat(response.isOk()).isTrue();
173+
waitAtMost(() -> consumerUpdateCount.get() == 2);
174+
175+
client.close();
176+
assertThat(consumerUpdateCount).hasValue(2);
177+
}
153178
}

0 commit comments

Comments
 (0)