Skip to content

Commit b7f3297

Browse files
committed
Check all connections are there in tests
Before trying to kill some of them. Test fails on CI sometimes.
1 parent 69fbf1a commit b7f3297

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@
2828
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
2929
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3030
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
31-
import java.util.Objects;
32-
import java.util.concurrent.atomic.AtomicBoolean;
33-
import java.util.concurrent.atomic.AtomicLong;
3431
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
3532
import java.time.Duration;
3633
import java.util.Collections;
3734
import java.util.Map;
35+
import java.util.Objects;
3836
import java.util.concurrent.CompletableFuture;
3937
import java.util.concurrent.ExecutionException;
4038
import java.util.concurrent.TimeUnit;
4139
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.concurrent.atomic.AtomicLong;
4242
import java.util.concurrent.atomic.AtomicReference;
4343
import java.util.function.LongConsumer;
4444
import java.util.function.LongSupplier;

src/test/java/com/rabbitmq/stream/Host.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public static Process killConnection(String connectionName) throws IOException {
112112
return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
113113
}
114114

115-
private static List<ConnectionInfo> listConnections() throws IOException {
115+
public static List<ConnectionInfo> listConnections() throws IOException {
116116
Process process =
117117
rabbitmqctl("list_stream_connections --formatter json conn_name,client_properties");
118118
return toConnectionInfoList(capture(process.getInputStream()));

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
408408
void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exception {
409409
Map<String, Boolean> consumerStates = new ConcurrentHashMap<>();
410410
List<String> consumerNames = IntStream.range(0, 5).mapToObj(i -> "foo-" + i).collect(toList());
411+
412+
int connectionCount = Host.listConnections().size();
413+
411414
for (String consumerName : consumerNames) {
412415
Client c0 =
413416
cf.get(
@@ -417,6 +420,7 @@ void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exceptio
417420
consumerStates.put(consumerName + "-connection-0", active);
418421
return null;
419422
}));
423+
connectionCount++;
420424
Client c1 =
421425
cf.get(
422426
withConnectionName(consumerName + "-connection-1")
@@ -425,6 +429,7 @@ void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exceptio
425429
consumerStates.put(consumerName + "-connection-1", active);
426430
return null;
427431
}));
432+
connectionCount++;
428433

429434
Map<String, String> subscriptionProperties = new HashMap<>();
430435
subscriptionProperties.put("single-active-consumer", "true");
@@ -440,6 +445,9 @@ void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exceptio
440445
assertThat(response).is(ok());
441446
}
442447

448+
int cCount = connectionCount;
449+
waitAtMost(() -> Host.listConnections().size() == cCount);
450+
443451
for (String consumerName : consumerNames) {
444452
Host.killConnection(consumerName + "-connection-0");
445453
waitAtMost(

0 commit comments

Comments
 (0)