Skip to content

Commit 10bfd70

Browse files
authored
Merge pull request #268 from rabbitmq/fast-shutdown
Speed up exit when --time is used
2 parents f4e69d7 + 57b074f commit 10bfd70

File tree

6 files changed

+71
-35
lines changed

6 files changed

+71
-35
lines changed

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class ConsumersCoordinator {
8484
private final NavigableSet<ClientSubscriptionsManager> managers = new ConcurrentSkipListSet<>();
8585
private final AtomicLong trackerIdSequence = new AtomicLong(0);
8686

87-
private final boolean debug = true;
87+
private final boolean debug = false;
8888
private final List<SubscriptionTracker> trackers = new CopyOnWriteArrayList<>();
8989

9090
ConsumersCoordinator(

src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class ProducersCoordinator {
6868
private final AtomicLong managerIdSequence = new AtomicLong(0);
6969
private final NavigableSet<ClientProducersManager> managers = new ConcurrentSkipListSet<>();
7070
private final AtomicLong trackerIdSequence = new AtomicLong(0);
71-
private final boolean debug = true;
71+
private final boolean debug = false;
7272
private final List<ProducerTracker> producerTrackers = new CopyOnWriteArrayList<>();
7373

7474
ProducersCoordinator(

src/main/java/com/rabbitmq/stream/impl/Utils.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,18 @@ static <T> T callAndMaybeRetry(
184184
String description = format(format, args);
185185
int attempt = 0;
186186
Exception lastException = null;
187+
long startTime = System.nanoTime();
187188
boolean keepTrying = true;
188189
while (keepTrying) {
189190
try {
190191
attempt++;
191192
T result = operation.get();
192-
LOGGER.debug("Operation '{}' completed after {} attempt(s)", description, attempt);
193+
Duration operationDuration = Duration.ofNanos(System.nanoTime() - startTime);
194+
LOGGER.debug(
195+
"Operation '{}' completed in {} ms after {} attempt(s)",
196+
description,
197+
operationDuration.toMillis(),
198+
attempt);
193199
return result;
194200
} catch (Exception e) {
195201
lastException = e;

src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.sun.net.httpserver.HttpServer;
1919
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
2020
import java.io.OutputStream;
21+
import java.io.PrintWriter;
2122
import java.net.InetAddress;
2223
import java.net.InetSocketAddress;
2324
import java.nio.charset.StandardCharsets;
@@ -37,14 +38,19 @@ class MonitoringContext {
3738
private final Environment environment;
3839

3940
private final Collection<Endpoint> endpoints = Collections.synchronizedList(new ArrayList<>());
41+
private final PrintWriter out;
4042

4143
private volatile HttpServer server;
4244

4345
MonitoringContext(
44-
int monitoringPort, CompositeMeterRegistry meterRegistry, Environment environment) {
46+
int monitoringPort,
47+
CompositeMeterRegistry meterRegistry,
48+
Environment environment,
49+
PrintWriter out) {
4550
this.monitoringPort = monitoringPort;
4651
this.meterRegistry = meterRegistry;
4752
this.environment = environment;
53+
this.out = out;
4854
}
4955

5056
void addHttpEndpoint(String path, String description, HttpHandler handler) {
@@ -88,7 +94,7 @@ void start() throws Exception {
8894
});
8995

9096
server.start();
91-
System.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
97+
this.out.println("Monitoring endpoints started on http://localhost:" + this.monitoringPort);
9298
}
9399
}
94100

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+52-29
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
import io.netty.buffer.ByteBufAllocator;
5959
import io.netty.buffer.ByteBufAllocatorMetric;
6060
import io.netty.buffer.ByteBufAllocatorMetricProvider;
61+
import io.netty.channel.EventLoopGroup;
62+
import io.netty.channel.nio.NioEventLoopGroup;
6163
import io.netty.handler.ssl.SslContextBuilder;
6264
import io.netty.handler.ssl.SslHandler;
6365
import io.netty.util.internal.PlatformDependent;
@@ -491,7 +493,7 @@ static int run(
491493
return commandLine.execute(args);
492494
}
493495

494-
static void versionInformation(PrintStream out) {
496+
static void versionInformation(PrintWriter out) {
495497
String lineSeparator = System.getProperty("line.separator");
496498
String version =
497499
format(
@@ -661,13 +663,16 @@ public Integer call() throws Exception {
661663
}
662664
}
663665

666+
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
667+
664668
EnvironmentBuilder environmentBuilder =
665669
Environment.builder()
666670
.id("stream-perf-test")
667671
.uris(this.uris)
668672
.addressResolver(addrResolver)
669673
.scheduledExecutorService(envExecutor)
670674
.metricsCollector(metricsCollector)
675+
.eventLoopGroup(eventLoopGroup)
671676
.byteBufAllocator(byteBufAllocator)
672677
.codec(codec)
673678
.maxProducersByConnection(this.producersByConnection)
@@ -701,10 +706,20 @@ public Integer call() throws Exception {
701706
}
702707

703708
Environment environment = environmentBuilder.channelCustomizer(channelCustomizer).build();
704-
shutdownService.wrap(closeStep("Closing environment(s)", () -> environment.close()));
709+
if (!isRunTimeLimited()) {
710+
shutdownService.wrap(
711+
closeStep(
712+
"Closing Netty event loop group",
713+
() -> {
714+
if (!eventLoopGroup.isShuttingDown() || !eventLoopGroup.isShutdown()) {
715+
eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS);
716+
}
717+
}));
718+
shutdownService.wrap(closeStep("Closing environment", () -> environment.close()));
719+
}
705720

706721
MonitoringContext monitoringContext =
707-
new MonitoringContext(this.monitoringPort, meterRegistry, environment);
722+
new MonitoringContext(this.monitoringPort, meterRegistry, environment, this.out);
708723
this.monitorings.forEach(m -> m.configure(monitoringContext));
709724
monitoringContext.start();
710725

@@ -958,14 +973,16 @@ public Integer call() throws Exception {
958973
})
959974
.collect(Collectors.toList()));
960975

961-
shutdownService.wrap(
962-
closeStep(
963-
"Closing consumers",
964-
() -> {
965-
for (Consumer consumer : consumers) {
966-
consumer.close();
967-
}
968-
}));
976+
if (!isRunTimeLimited()) {
977+
shutdownService.wrap(
978+
closeStep(
979+
"Closing consumers",
980+
() -> {
981+
for (Consumer consumer : consumers) {
982+
consumer.close();
983+
}
984+
}));
985+
}
969986

970987
ExecutorService executorService;
971988
if (this.producers > 0) {
@@ -980,23 +997,25 @@ public Integer call() throws Exception {
980997
executorService = null;
981998
}
982999

983-
shutdownService.wrap(
984-
closeStep(
985-
"Closing producers",
986-
() -> {
987-
for (Producer p : producers) {
988-
p.close();
989-
}
990-
}));
1000+
if (!isRunTimeLimited()) {
1001+
shutdownService.wrap(
1002+
closeStep(
1003+
"Closing producers",
1004+
() -> {
1005+
for (Producer p : producers) {
1006+
p.close();
1007+
}
1008+
}));
9911009

992-
shutdownService.wrap(
993-
closeStep(
994-
"Closing producers executor service",
995-
() -> {
996-
if (executorService != null) {
997-
executorService.shutdownNow();
998-
}
999-
}));
1010+
shutdownService.wrap(
1011+
closeStep(
1012+
"Closing producers executor service",
1013+
() -> {
1014+
if (executorService != null) {
1015+
executorService.shutdownNow();
1016+
}
1017+
}));
1018+
}
10001019

10011020
String metricsHeader = "Arguments: " + String.join(" ", arguments);
10021021

@@ -1008,7 +1027,7 @@ public Integer call() throws Exception {
10081027
Thread shutdownHook = new Thread(() -> latch.countDown());
10091028
Runtime.getRuntime().addShutdownHook(shutdownHook);
10101029
try {
1011-
if (this.time > 0) {
1030+
if (isRunTimeLimited()) {
10121031
latch.await(this.time, TimeUnit.SECONDS);
10131032
} else {
10141033
latch.await();
@@ -1091,7 +1110,7 @@ private void maybeDisplayEnvironmentVariablesHelp() {
10911110

10921111
private void maybeDisplayVersion() {
10931112
if (this.version) {
1094-
versionInformation(System.out);
1113+
versionInformation(this.out);
10951114
System.exit(0);
10961115
}
10971116
}
@@ -1112,6 +1131,10 @@ public String toString() {
11121131
};
11131132
}
11141133

1134+
private boolean isRunTimeLimited() {
1135+
return this.time > 0;
1136+
}
1137+
11151138
public void monitorings(List<Monitoring> monitorings) {
11161139
this.monitorings = monitorings;
11171140
}

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.io.IOException;
3737
import java.io.InputStreamReader;
3838
import java.io.PrintStream;
39+
import java.io.PrintWriter;
3940
import java.net.HttpURLConnection;
4041
import java.net.ServerSocket;
4142
import java.net.URL;
@@ -165,7 +166,7 @@ void helpShouldReturnImmediately() throws Exception {
165166

166167
@Test
167168
void versionShouldReturnAppropriateInformation() {
168-
StreamPerfTest.versionInformation(new PrintStream(out, true));
169+
StreamPerfTest.versionInformation(new PrintWriter(out, true));
169170
assertThat(consoleOutput()).contains("RabbitMQ Stream Perf Test");
170171
}
171172

0 commit comments

Comments
 (0)