Skip to content

Commit daf56e3

Browse files
committed
Make sure stream has replicas in test
1 parent 16065a1 commit daf56e3

File tree

3 files changed

+12
-34
lines changed

3 files changed

+12
-34
lines changed

src/test/java/com/rabbitmq/stream/Cli.java

+1-20
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,9 @@
2626
import java.util.*;
2727
import java.util.concurrent.Callable;
2828
import java.util.stream.Collectors;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3129

3230
public class Cli {
3331

34-
private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);
35-
3632
private static final String DOCKER_PREFIX = "DOCKER:";
3733

3834
private static final Gson GSON = new Gson();
@@ -252,15 +248,6 @@ public static String rabbitmqctlCommand() {
252248
}
253249
}
254250

255-
private static String dockerContainer() {
256-
if (rabbitmqctlCommand().startsWith("docker")) {
257-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
258-
return rabbitmqCtl.split(":")[1];
259-
} else {
260-
throw new IllegalStateException("Broker does not run on broker");
261-
}
262-
}
263-
264251
private static String rabbitmqStreamsCommand() {
265252
String rabbitmqctl = rabbitmqctlCommand();
266253
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
@@ -314,7 +301,7 @@ private static void setResourceAlarm(String source) throws IOException {
314301
rabbitmqctl("eval 'rabbit_alarm:set_alarm({{resource_limit, " + source + ", node()}, []}).'");
315302
}
316303

317-
private static void clearResourceAlarm(String source) throws IOException {
304+
private static void clearResourceAlarm(String source) {
318305
rabbitmqctl("eval 'rabbit_alarm:clear_alarm({resource_limit, " + source + ", node()}).'");
319306
}
320307

@@ -363,12 +350,6 @@ public static void restartNode(String node) {
363350
executeCommand(dockerCommand + "rabbitmqctl status");
364351
}
365352

366-
public static void restartBrokerContainer() {
367-
String container = dockerContainer();
368-
executeCommand("docker stop " + container);
369-
executeCommand("docker start " + container);
370-
}
371-
372353
public static void rebalance() {
373354
rabbitmqQueues("rebalance all");
374355
}

src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import static com.rabbitmq.stream.impl.Assertions.assertThat;
18+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
1819
import static java.lang.Integer.parseInt;
1920
import static java.util.stream.Collectors.toSet;
2021
import static java.util.stream.IntStream.range;
@@ -79,7 +80,7 @@ void tearDown() throws Exception {
7980

8081
@ParameterizedTest
8182
@ValueSource(booleans = {false, true})
82-
void pickConsumersAmongCandidates(boolean forceReplica) {
83+
void pickConsumersAmongCandidates(boolean forceReplica) throws Exception {
8384
int maxSubscriptionsPerClient = 2;
8485
int subscriptionCount = maxSubscriptionsPerClient * 10;
8586
try (ConsumersCoordinator c =
@@ -91,6 +92,10 @@ void pickConsumersAmongCandidates(boolean forceReplica) {
9192
forceReplica,
9293
Utils.brokerPicker())) {
9394

95+
waitAtMost(
96+
() -> locator.metadata(stream).get(stream).hasReplicas(),
97+
"Stream '%s' should have replicas");
98+
9499
range(0, subscriptionCount)
95100
.forEach(
96101
ignored -> {

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,7 @@
4444
import io.netty.channel.EventLoopGroup;
4545
import io.netty.channel.nio.NioEventLoopGroup;
4646
import io.vavr.Tuple2;
47-
import java.io.BufferedReader;
4847
import java.io.IOException;
49-
import java.io.InputStream;
50-
import java.io.InputStreamReader;
5148
import java.lang.annotation.Documented;
5249
import java.lang.annotation.ElementType;
5350
import java.lang.annotation.Retention;
@@ -97,6 +94,11 @@ public static Duration waitAtMost(CallableBooleanSupplier condition) throws Exce
9794
return waitAtMost(DEFAULT_CONDITION_TIMEOUT, condition, null);
9895
}
9996

97+
public static Duration waitAtMost(
98+
CallableBooleanSupplier condition, String format, Object... args) throws Exception {
99+
return waitAtMost(DEFAULT_CONDITION_TIMEOUT, condition, () -> String.format(format, args));
100+
}
101+
100102
public static Duration waitAtMost(CallableBooleanSupplier condition, Supplier<String> message)
101103
throws Exception {
102104
return waitAtMost(DEFAULT_CONDITION_TIMEOUT, condition, message);
@@ -384,16 +386,6 @@ static boolean isCluster() {
384386
return !content.replace("[", "").replace("]", "").trim().isEmpty();
385387
}
386388

387-
private static String capture(InputStream is) throws IOException {
388-
BufferedReader br = new BufferedReader(new InputStreamReader(is));
389-
String line;
390-
StringBuilder buff = new StringBuilder();
391-
while ((line = br.readLine()) != null) {
392-
buff.append(line).append("\n");
393-
}
394-
return buff.toString();
395-
}
396-
397389
static <T> void forEach(Collection<T> in, CallableIndexConsumer<T> consumer) throws Exception {
398390
int count = 0;
399391
for (T t : in) {

0 commit comments

Comments
 (0)