Skip to content

Commit 93e9ee1

Browse files
committed
Add recovery cluster test
1 parent b1d3c1d commit 93e9ee1

19 files changed

+474
-206
lines changed

src/test/java/com/rabbitmq/stream/Host.java renamed to src/test/java/com/rabbitmq/stream/Cli.java

+183-106
Original file line numberDiff line numberDiff line change
@@ -29,62 +29,58 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32-
public class Host {
32+
public class Cli {
3333

34-
private static final Logger LOGGER = LoggerFactory.getLogger(Host.class);
34+
private static final Logger LOGGER = LoggerFactory.getLogger(Cli.class);
3535

3636
private static final String DOCKER_PREFIX = "DOCKER:";
3737

3838
private static final Gson GSON = new Gson();
3939

40-
public static String capture(InputStream is) throws IOException {
41-
BufferedReader br = new BufferedReader(new InputStreamReader(is));
42-
String line;
43-
StringBuilder buff = new StringBuilder();
44-
while ((line = br.readLine()) != null) {
45-
buff.append(line).append("\n");
46-
}
47-
return buff.toString();
48-
}
40+
private static final Map<String, String> DOCKER_NODES_TO_CONTAINERS =
41+
Map.of(
42+
"rabbit@node0", "rabbitmq0",
43+
"rabbit@node1", "rabbitmq1",
44+
"rabbit@node2", "rabbitmq2");
4945

50-
private static Process executeCommand(String command) {
46+
private static ProcessState executeCommand(String command) {
5147
return executeCommand(command, false);
5248
}
5349

54-
private static Process executeCommand(String command, boolean ignoreError) {
55-
try {
56-
Process pr = executeCommandProcess(command);
57-
58-
int ev = waitForExitValue(pr);
59-
if (ev != 0 && !ignoreError) {
60-
String stdout = capture(pr.getInputStream());
61-
String stderr = capture(pr.getErrorStream());
62-
throw new IOException(
63-
"unexpected command exit value: "
64-
+ ev
65-
+ "\ncommand: "
66-
+ command
67-
+ "\n"
68-
+ "\nstdout:\n"
69-
+ stdout
70-
+ "\nstderr:\n"
71-
+ stderr
72-
+ "\n");
73-
}
74-
return pr;
75-
} catch (IOException e) {
76-
throw new RuntimeException(e);
50+
private static ProcessState executeCommand(String command, boolean ignoreError) {
51+
Process pr = executeCommandProcess(command);
52+
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
53+
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
54+
55+
int ev = waitForExitValue(pr, inputState, errorState);
56+
inputState.pump();
57+
errorState.pump();
58+
if (ev != 0 && !ignoreError) {
59+
throw new RuntimeException(
60+
"unexpected command exit value: "
61+
+ ev
62+
+ "\ncommand: "
63+
+ command
64+
+ "\n"
65+
+ "\nstdout:\n"
66+
+ inputState.buffer.toString()
67+
+ "\nstderr:\n"
68+
+ errorState.buffer.toString()
69+
+ "\n");
7770
}
71+
return new ProcessState(inputState);
7872
}
7973

80-
public static String hostname() throws IOException {
81-
Process process = executeCommand("hostname");
82-
return capture(process.getInputStream()).trim();
74+
public static String hostname() {
75+
return executeCommand("hostname").output();
8376
}
8477

85-
private static int waitForExitValue(Process pr) {
78+
private static int waitForExitValue(
79+
Process pr, InputStreamPumpState inputState, InputStreamPumpState errorState) {
8680
while (true) {
8781
try {
82+
inputState.pump();
83+
errorState.pump();
8884
pr.waitFor();
8985
break;
9086
} catch (InterruptedException ignored) {
@@ -93,7 +89,7 @@ private static int waitForExitValue(Process pr) {
9389
return pr.exitValue();
9490
}
9591

96-
private static Process executeCommandProcess(String command) throws IOException {
92+
private static Process executeCommandProcess(String command) {
9793
String[] finalCommand;
9894
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
9995
finalCommand = new String[4];
@@ -107,70 +103,59 @@ private static Process executeCommandProcess(String command) throws IOException
107103
finalCommand[1] = "-c";
108104
finalCommand[2] = command;
109105
}
110-
return Runtime.getRuntime().exec(finalCommand);
106+
try {
107+
return Runtime.getRuntime().exec(finalCommand);
108+
} catch (IOException e) {
109+
throw new RuntimeException(e);
110+
}
111111
}
112112

113-
public static Process rabbitmqctl(String command) throws IOException {
113+
public static ProcessState rabbitmqctl(String command) {
114114
return executeCommand(rabbitmqctlCommand() + " " + command);
115115
}
116116

117-
static Process rabbitmqStreams(String command) {
117+
static ProcessState rabbitmqStreams(String command) {
118118
return executeCommand(rabbitmqStreamsCommand() + " " + command);
119119
}
120120

121-
public static Process rabbitmqctlIgnoreError(String command) {
121+
public static ProcessState rabbitmqctlIgnoreError(String command) {
122122
return executeCommand(rabbitmqctlCommand() + " " + command, true);
123123
}
124124

125-
public static Process killConnection(String connectionName) {
126-
try {
127-
List<ConnectionInfo> cs = listConnections();
128-
if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
129-
throw new IllegalArgumentException(
130-
format(
131-
"Could not find 1 connection '%s' in stream connections: %s",
132-
connectionName,
133-
cs.stream()
134-
.map(ConnectionInfo::clientProvidedName)
135-
.collect(Collectors.joining(", "))));
136-
}
137-
return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
138-
} catch (IOException e) {
139-
throw new RuntimeException(e);
125+
public static ProcessState killConnection(String connectionName) {
126+
List<ConnectionInfo> cs = listConnections();
127+
if (cs.stream().filter(c -> connectionName.equals(c.clientProvidedName())).count() != 1) {
128+
throw new IllegalArgumentException(
129+
format(
130+
"Could not find 1 connection '%s' in stream connections: %s",
131+
connectionName,
132+
cs.stream()
133+
.map(ConnectionInfo::clientProvidedName)
134+
.collect(Collectors.joining(", "))));
140135
}
136+
return rabbitmqctl("eval 'rabbit_stream:kill_connection(\"" + connectionName + "\").'");
141137
}
142138

143139
public static List<ConnectionInfo> listConnections() {
144-
try {
145-
Process process =
146-
rabbitmqctl("list_stream_connections -q --formatter table conn_name,client_properties");
147-
List<ConnectionInfo> connectionInfoList = Collections.emptyList();
148-
if (process.exitValue() != 0) {
149-
LOGGER.warn(
150-
"Error while trying to list stream connections. Standard output: {}, error output: {}",
151-
capture(process.getInputStream()),
152-
capture(process.getErrorStream()));
153-
return connectionInfoList;
154-
}
155-
String content = capture(process.getInputStream());
156-
String[] lines = content.split(System.getProperty("line.separator"));
157-
if (lines.length > 1) {
158-
connectionInfoList = new ArrayList<>(lines.length - 1);
159-
for (int i = 1; i < lines.length; i++) {
160-
String line = lines[i];
161-
String[] fields = line.split("\t");
162-
String connectionName = fields[0];
163-
Map<String, String> clientProperties = Collections.emptyMap();
164-
if (fields.length > 1 && fields[1].length() > 1) {
165-
clientProperties = buildClientProperties(fields);
166-
}
167-
connectionInfoList.add(new ConnectionInfo(connectionName, clientProperties));
140+
ProcessState process =
141+
rabbitmqctl("list_stream_connections -q --formatter table conn_name,client_properties");
142+
List<ConnectionInfo> connectionInfoList = Collections.emptyList();
143+
String content = process.output();
144+
String[] lines = content.split(System.lineSeparator());
145+
if (lines.length > 1) {
146+
connectionInfoList = new ArrayList<>(lines.length - 1);
147+
for (int i = 1; i < lines.length; i++) {
148+
String line = lines[i];
149+
String[] fields = line.split("\t");
150+
String connectionName = fields[0];
151+
Map<String, String> clientProperties = Collections.emptyMap();
152+
if (fields.length > 1 && fields[1].length() > 1) {
153+
clientProperties = buildClientProperties(fields);
168154
}
155+
connectionInfoList.add(new ConnectionInfo(connectionName, clientProperties));
169156
}
170-
return connectionInfoList;
171-
} catch (IOException e) {
172-
throw new RuntimeException(e);
173157
}
158+
return connectionInfoList;
174159
}
175160

176161
private static Map<String, String> buildClientProperties(String[] fields) {
@@ -201,32 +186,26 @@ public static void restartStream(String stream) {
201186
rabbitmqStreams(" restart_stream " + stream);
202187
}
203188

204-
public static Process killStreamLeaderProcess(String stream) {
205-
try {
206-
return rabbitmqctl(
207-
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
208-
+ stream
209-
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
210-
} catch (IOException e) {
211-
throw new RuntimeException(e);
212-
}
189+
public static void killStreamLeaderProcess(String stream) {
190+
rabbitmqctl(
191+
"eval 'case rabbit_stream_manager:lookup_leader(<<\"/\">>, <<\""
192+
+ stream
193+
+ "\">>) of {ok, Pid} -> exit(Pid, kill); Pid -> exit(Pid, kill) end.'");
213194
}
214195

215-
public static void addUser(String username, String password) throws IOException {
196+
public static void addUser(String username, String password) {
216197
rabbitmqctl(format("add_user %s %s", username, password));
217198
}
218199

219-
public static void setPermissions(String username, List<String> permissions) throws IOException {
200+
public static void setPermissions(String username, List<String> permissions) {
220201
setPermissions(username, "/", permissions);
221202
}
222203

223-
public static void setPermissions(String username, String vhost, String permission)
224-
throws IOException {
204+
public static void setPermissions(String username, String vhost, String permission) {
225205
setPermissions(username, vhost, asList(permission, permission, permission));
226206
}
227207

228-
public static void setPermissions(String username, String vhost, List<String> permissions)
229-
throws IOException {
208+
public static void setPermissions(String username, String vhost, List<String> permissions) {
230209
if (permissions.size() != 3) {
231210
throw new IllegalArgumentException();
232211
}
@@ -236,23 +215,23 @@ public static void setPermissions(String username, String vhost, List<String> pe
236215
vhost, username, permissions.get(0), permissions.get(1), permissions.get(2)));
237216
}
238217

239-
public static void changePassword(String username, String newPassword) throws IOException {
218+
public static void changePassword(String username, String newPassword) {
240219
rabbitmqctl(format("change_password %s %s", username, newPassword));
241220
}
242221

243-
public static void deleteUser(String username) throws IOException {
222+
public static void deleteUser(String username) {
244223
rabbitmqctl(format("delete_user %s", username));
245224
}
246225

247-
public static void addVhost(String vhost) throws IOException {
226+
public static void addVhost(String vhost) {
248227
rabbitmqctl("add_vhost " + vhost);
249228
}
250229

251-
public static void deleteVhost(String vhost) throws Exception {
230+
public static void deleteVhost(String vhost) {
252231
rabbitmqctl("delete_vhost " + vhost);
253232
}
254233

255-
public static void setEnv(String parameter, String value) throws IOException {
234+
public static void setEnv(String parameter, String value) {
256235
rabbitmqctl(format("eval 'application:set_env(rabbitmq_stream, %s, %s).'", parameter, value));
257236
}
258237

@@ -334,6 +313,68 @@ public static boolean isOnDocker() {
334313
return rabbitmqCtl.startsWith(DOCKER_PREFIX);
335314
}
336315

316+
public static List<String> nodes() {
317+
List<String> clusterNodes = new ArrayList<>();
318+
clusterNodes.add(rabbitmqctl("eval 'node().'").output().trim());
319+
List<String> nodes =
320+
Arrays.stream(
321+
rabbitmqctl("eval 'nodes().'")
322+
.output()
323+
.replace("[", "")
324+
.replace("]", "")
325+
.split(","))
326+
.map(String::trim)
327+
.collect(Collectors.toList());
328+
clusterNodes.addAll(nodes);
329+
return List.copyOf(clusterNodes);
330+
}
331+
332+
public static void restartNode(String node) {
333+
String container = nodeToDockerContainer(node);
334+
String dockerCommand = "docker exec " + container + " ";
335+
String rabbitmqUpgradeCommand = dockerCommand + "rabbitmq-upgrade ";
336+
executeCommand(rabbitmqUpgradeCommand + "await_online_quorum_plus_one -t 300");
337+
executeCommand(rabbitmqUpgradeCommand + "drain");
338+
executeCommand("docker stop " + container);
339+
executeCommand("docker start " + container);
340+
String otherContainer =
341+
DOCKER_NODES_TO_CONTAINERS.values().stream()
342+
.filter(c -> !c.endsWith(container))
343+
.findAny()
344+
.get();
345+
executeCommand(
346+
"docker exec "
347+
+ otherContainer
348+
+ " rabbitmqctl await_online_nodes "
349+
+ DOCKER_NODES_TO_CONTAINERS.size());
350+
executeCommand(dockerCommand + "rabbitmqctl status");
351+
}
352+
353+
public static void rebalance() {
354+
rabbitmqQueues("rebalance all");
355+
}
356+
357+
static ProcessState rabbitmqQueues(String command) {
358+
return executeCommand(rabbitmqQueuesCommand() + " " + command);
359+
}
360+
361+
private static String rabbitmqQueuesCommand() {
362+
String rabbitmqctl = rabbitmqctlCommand();
363+
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
364+
if (lastIndex == -1) {
365+
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
366+
}
367+
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-queues";
368+
}
369+
370+
private static String nodeToDockerContainer(String node) {
371+
String containerId = DOCKER_NODES_TO_CONTAINERS.get(node);
372+
if (containerId == null) {
373+
throw new IllegalArgumentException("No container for node " + node);
374+
}
375+
return containerId;
376+
}
377+
337378
private static final class CallableAutoCloseable implements AutoCloseable {
338379

339380
private final Callable<Void> end;
@@ -378,4 +419,40 @@ public String toString() {
378419
+ '}';
379420
}
380421
}
422+
423+
public static class ProcessState {
424+
425+
private final InputStreamPumpState inputState;
426+
427+
ProcessState(InputStreamPumpState inputState) {
428+
this.inputState = inputState;
429+
}
430+
431+
public String output() {
432+
return inputState.buffer.toString();
433+
}
434+
}
435+
436+
private static class InputStreamPumpState {
437+
438+
private final BufferedReader reader;
439+
private final StringBuilder buffer;
440+
441+
private InputStreamPumpState(InputStream in) {
442+
this.reader = new BufferedReader(new InputStreamReader(in));
443+
this.buffer = new StringBuilder();
444+
}
445+
446+
void pump() {
447+
String line;
448+
while (true) {
449+
try {
450+
if ((line = reader.readLine()) == null) break;
451+
} catch (IOException e) {
452+
throw new RuntimeException(e);
453+
}
454+
buffer.append(line).append("\n");
455+
}
456+
}
457+
}
381458
}

0 commit comments

Comments
 (0)