Skip to content

Commit 3ba86c6

Browse files
authored
Merge pull request #672 from rabbitmq/recovery
Recovery improvements
2 parents 741de6a + bedf745 commit 3ba86c6

File tree

5 files changed

+34
-57
lines changed

5 files changed

+34
-57
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

+14-11
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class StreamEnvironment implements Environment {
6666

6767
private final EventLoopGroup eventLoopGroup;
6868
private final ScheduledExecutorService scheduledExecutorService;
69+
private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
6970
private final boolean privateScheduleExecutorService;
7071
private final Client.ClientParameters clientParametersPrototype;
7172
private final List<Address> addresses;
@@ -235,17 +236,22 @@ class StreamEnvironment implements Environment {
235236
maxProducersByConnection,
236237
maxTrackingConsumersByConnection,
237238
connectionNamingStrategy,
238-
Utils.coordinatorClientFactory(this, producerNodeRetryDelay),
239+
coordinatorClientFactory(this, producerNodeRetryDelay),
239240
forceLeaderForProducers);
240241
this.consumersCoordinator =
241242
new ConsumersCoordinator(
242243
this,
243244
maxConsumersByConnection,
244245
connectionNamingStrategy,
245-
Utils.coordinatorClientFactory(this, consumerNodeRetryDelay),
246+
coordinatorClientFactory(this, consumerNodeRetryDelay),
246247
forceReplicaForConsumers,
247248
Utils.brokerPicker());
248249
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
250+
251+
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
252+
this.locatorReconnectionScheduledExecutorService =
253+
Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
254+
249255
ClientParameters clientParametersForInit = locatorParametersCopy();
250256
Runnable locatorInitSequence =
251257
() -> {
@@ -291,7 +297,7 @@ class StreamEnvironment implements Environment {
291297
l,
292298
connectionNamingStrategy,
293299
clientFactory,
294-
this.scheduledExecutorService,
300+
this.locatorReconnectionScheduledExecutorService,
295301
this.recoveryBackOffDelayPolicy,
296302
l.label());
297303
}
@@ -338,7 +344,7 @@ private ShutdownListener shutdownListener(
338344
locator,
339345
connectionNamingStrategy,
340346
clientFactory,
341-
this.scheduledExecutorService,
347+
this.locatorReconnectionScheduledExecutorService,
342348
delayPolicy,
343349
label);
344350
} else {
@@ -683,6 +689,9 @@ public void close() {
683689
if (privateScheduleExecutorService) {
684690
this.scheduledExecutorService.shutdownNow();
685691
}
692+
if (this.locatorReconnectionScheduledExecutorService != null) {
693+
this.locatorReconnectionScheduledExecutorService.shutdownNow();
694+
}
686695
try {
687696
if (this.eventLoopGroup != null
688697
&& (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
@@ -904,13 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer(
904913
@Override
905914
public String toString() {
906915
return "{ \"locators\" : ["
907-
+ this.locators.stream()
908-
.map(
909-
l -> {
910-
Client c = l.nullableClient();
911-
return c == null ? "null" : ("\"" + l.label() + "\"");
912-
})
913-
.collect(Collectors.joining(","))
916+
+ this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(","))
914917
+ "], "
915918
+ Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount())
916919
+ ","

src/test/java/com/rabbitmq/stream/Cli.java

+15
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() {
252252
}
253253
}
254254

255+
private static String dockerContainer() {
256+
if (rabbitmqctlCommand().startsWith("docker")) {
257+
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
258+
return rabbitmqCtl.split(":")[1];
259+
} else {
260+
throw new IllegalStateException("Broker does not run on broker");
261+
}
262+
}
263+
255264
private static String rabbitmqStreamsCommand() {
256265
String rabbitmqctl = rabbitmqctlCommand();
257266
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
@@ -354,6 +363,12 @@ public static void restartNode(String node) {
354363
executeCommand(dockerCommand + "rabbitmqctl status");
355364
}
356365

366+
public static void restartBrokerContainer() {
367+
String container = dockerContainer();
368+
executeCommand("docker stop " + container);
369+
executeCommand("docker start " + container);
370+
}
371+
357372
public static void rebalance() {
358373
rabbitmqQueues("rebalance all");
359374
}

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,9 @@ public class RecoveryClusterTest {
7272
static List<Level> logLevels;
7373
static List<Class<?>> logClasses =
7474
List.of(
75-
ProducersCoordinator.class,
76-
ConsumersCoordinator.class,
77-
StreamEnvironment.class,
78-
AsyncRetry.class,
79-
StreamEnvironment.class,
80-
ScheduledExecutorServiceWrapper.class);
75+
// ProducersCoordinator.class,
76+
// ConsumersCoordinator.class,
77+
AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class);
8178
ScheduledExecutorService scheduledExecutorService;
8279

8380
@BeforeAll

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+2-18
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,9 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.TestUtils.*;
1718
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1819
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
19-
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
20-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21-
import static com.rabbitmq.stream.impl.TestUtils.localhostTls;
22-
import static com.rabbitmq.stream.impl.TestUtils.streamName;
23-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2420
import static java.util.concurrent.TimeUnit.SECONDS;
2521
import static java.util.stream.Collectors.toList;
2622
import static java.util.stream.IntStream.range;
@@ -57,7 +53,6 @@
5753
import io.netty.channel.EventLoopGroup;
5854
import io.netty.channel.epoll.EpollEventLoopGroup;
5955
import io.netty.channel.epoll.EpollSocketChannel;
60-
import io.netty.channel.nio.NioEventLoopGroup;
6156
import io.netty.handler.ssl.SslHandler;
6257
import java.net.ConnectException;
6358
import java.nio.charset.StandardCharsets;
@@ -92,22 +87,11 @@
9287
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
9388
public class StreamEnvironmentTest {
9489

95-
static EventLoopGroup eventLoopGroup;
96-
9790
EnvironmentBuilder environmentBuilder;
9891

9992
String stream;
10093
TestUtils.ClientFactory cf;
101-
102-
@BeforeAll
103-
static void initAll() {
104-
eventLoopGroup = new NioEventLoopGroup();
105-
}
106-
107-
@AfterAll
108-
static void afterAll() throws Exception {
109-
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
110-
}
94+
EventLoopGroup eventLoopGroup;
11195

11296
@BeforeEach
11397
void init() {

src/test/java/com/rabbitmq/stream/impl/TlsTest.java

-22
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,13 @@
5656
import javax.net.ssl.SSLException;
5757
import javax.net.ssl.SSLHandshakeException;
5858
import javax.net.ssl.SSLParameters;
59-
import org.junit.jupiter.api.AfterEach;
60-
import org.junit.jupiter.api.BeforeEach;
6159
import org.junit.jupiter.api.Test;
6260
import org.junit.jupiter.api.extension.ExtendWith;
6361

6462
@DisabledIfTlsNotEnabled
6563
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
6664
public class TlsTest {
6765

68-
static boolean isJava13() {
69-
String javaVersion = System.getProperty("java.version");
70-
return javaVersion != null && javaVersion.startsWith("13.");
71-
}
72-
73-
@BeforeEach
74-
public void init() {
75-
if (isJava13()) {
76-
// for Java 13.0.7, see https://github.com/bcgit/bc-java/issues/941
77-
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "PBEWithHmacSHA256AndAES_256");
78-
}
79-
}
80-
81-
@AfterEach
82-
public void tearDown() throws Exception {
83-
if (isJava13()) {
84-
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "");
85-
}
86-
}
87-
8866
String stream;
8967

9068
TestUtils.ClientFactory cf;

0 commit comments

Comments
 (0)