Skip to content

Commit 19f35e6

Browse files
committed
Add test for SAC + connection killing
The stream SAC coordinator monitors connection processes and should send consumer update accordingly when connections die. References #46, rabbitmq/rabbitmq-server#3753
1 parent effeab5 commit 19f35e6

File tree

1 file changed

+55
-3
lines changed

1 file changed

+55
-3
lines changed

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2021-2022 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -19,14 +19,17 @@
1919
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
2020
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2121
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
22+
import static java.util.stream.Collectors.toList;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324

2425
import com.rabbitmq.client.Connection;
2526
import com.rabbitmq.client.ConnectionFactory;
27+
import com.rabbitmq.stream.Host;
2628
import com.rabbitmq.stream.OffsetSpecification;
2729
import com.rabbitmq.stream.impl.Client.ClientParameters;
2830
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
2931
import com.rabbitmq.stream.impl.Client.Response;
32+
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
3033
import java.util.HashMap;
3134
import java.util.List;
3235
import java.util.Map;
@@ -35,7 +38,6 @@
3538
import java.util.concurrent.atomic.AtomicLong;
3639
import java.util.concurrent.atomic.AtomicReference;
3740
import java.util.function.Consumer;
38-
import java.util.stream.Collectors;
3941
import java.util.stream.IntStream;
4042
import org.junit.jupiter.api.Test;
4143
import org.junit.jupiter.api.TestInfo;
@@ -61,6 +63,10 @@ private static Map<Byte, AtomicInteger> receivedMessages(int subscriptionCount)
6163
return receivedMessages;
6264
}
6365

66+
private static ClientParameters withConnectionName(String connectionName) {
67+
return new ClientParameters().clientProperty("connection_name", connectionName);
68+
}
69+
6470
@Test
6571
void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exception {
6672
Client writerClient = cf.get();
@@ -316,7 +322,7 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
316322
try {
317323
declareSuperStreamTopology(c, superStream, 3);
318324
List<String> partitions =
319-
IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(Collectors.toList());
325+
IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(toList());
320326
ConsumerUpdateListener consumerUpdateListener =
321327
(client1, subscriptionId, active) -> {
322328
consumerStates.put(subscriptionId, active);
@@ -395,4 +401,50 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
395401
deleteSuperStreamTopology(c, superStream, 3);
396402
}
397403
}
404+
405+
@Test
406+
@DisabledIfRabbitMqCtlNotSet
407+
void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exception {
408+
Map<String, Boolean> consumerStates = new ConcurrentHashMap<>();
409+
List<String> consumerNames = IntStream.range(0, 5).mapToObj(i -> "foo-" + i).collect(toList());
410+
for (String consumerName : consumerNames) {
411+
Client c0 =
412+
cf.get(
413+
withConnectionName(consumerName + "-connection-0")
414+
.consumerUpdateListener(
415+
(client, subscriptionId, active) -> {
416+
consumerStates.put(consumerName + "-connection-0", active);
417+
return null;
418+
}));
419+
Client c1 =
420+
cf.get(
421+
withConnectionName(consumerName + "-connection-1")
422+
.consumerUpdateListener(
423+
(client, subscriptionId, active) -> {
424+
consumerStates.put(consumerName + "-connection-1", active);
425+
return null;
426+
}));
427+
428+
Map<String, String> subscriptionProperties = new HashMap<>();
429+
subscriptionProperties.put("single-active-consumer", "true");
430+
subscriptionProperties.put("name", consumerName);
431+
432+
Response response =
433+
c0.subscribe(b(0), stream, OffsetSpecification.first(), 2, subscriptionProperties);
434+
assertThat(response).is(ok());
435+
waitAtMost(() -> consumerStates.containsKey(consumerName + "-connection-0"));
436+
response = c1.subscribe(b(0), stream, OffsetSpecification.first(), 2, subscriptionProperties);
437+
assertThat(response).is(ok());
438+
response = c0.subscribe(b(1), stream, OffsetSpecification.first(), 2, subscriptionProperties);
439+
assertThat(response).is(ok());
440+
}
441+
442+
for (String consumerName : consumerNames) {
443+
Host.killConnection(consumerName + "-connection-0");
444+
waitAtMost(
445+
() ->
446+
consumerStates.containsKey(consumerName + "-connection-1")
447+
&& consumerStates.get(consumerName + "-connection-1"));
448+
}
449+
}
398450
}

0 commit comments

Comments
 (0)