Skip to content

Commit 7c5e466

Browse files
committed
Add monitoring HTTP endpoint
Only thread dump for now. Fixes #28
1 parent 9bf1eb9 commit 7c5e466

File tree

9 files changed

+399
-12
lines changed

9 files changed

+399
-12
lines changed

pom.xml

+15
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
<micrometer.version>1.7.3</micrometer.version>
5858
<swiftmq-client.version>12.2.2</swiftmq-client.version>
5959
<picocli.version>4.6.1</picocli.version>
60+
<jetty.version>9.4.43.v20210629</jetty.version>
6061
<guava.version>30.1.1-jre</guava.version>
6162
<commons-compress.version>1.21</commons-compress.version>
6263
<zstd-jni.version>1.5.0-4</zstd-jni.version>
@@ -174,6 +175,13 @@
174175
<version>${guava.version}</version>
175176
<optional>true</optional>
176177
</dependency>
178+
179+
<dependency>
180+
<groupId>org.eclipse.jetty</groupId>
181+
<artifactId>jetty-servlet</artifactId>
182+
<version>${jetty.version}</version>
183+
<optional>true</optional>
184+
</dependency>
177185
<!-- end of dependencies for performance tool -->
178186

179187
<dependency>
@@ -561,6 +569,13 @@
561569
<version>${guava.version}</version>
562570
</dependency>
563571

572+
<dependency>
573+
<groupId>org.eclipse.jetty</groupId>
574+
<artifactId>jetty-servlet</artifactId>
575+
<version>${jetty.version}</version>
576+
<optional>true</optional>
577+
</dependency>
578+
564579
</dependencies>
565580
<build>
566581
<finalName>${finalName}</finalName>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.perf;
15+
16+
import java.io.IOException;
17+
import java.io.PrintWriter;
18+
import java.io.StringWriter;
19+
import java.lang.management.LockInfo;
20+
import java.lang.management.ManagementFactory;
21+
import java.lang.management.MonitorInfo;
22+
import java.lang.management.RuntimeMXBean;
23+
import java.lang.management.ThreadInfo;
24+
import java.time.LocalDateTime;
25+
import java.time.format.DateTimeFormatter;
26+
import java.util.List;
27+
import java.util.stream.Collectors;
28+
import java.util.stream.Stream;
29+
import javax.servlet.http.HttpServletRequest;
30+
import javax.servlet.http.HttpServletResponse;
31+
import org.eclipse.jetty.server.Request;
32+
import org.eclipse.jetty.server.handler.AbstractHandler;
33+
import picocli.CommandLine.Option;
34+
35+
class DebugEndpointMonitoring implements Monitoring {
36+
37+
@Option(
38+
names = {"--monitoring"},
39+
description = "Enable HTTP endpoint for monitoring and debugging",
40+
defaultValue = "false")
41+
private boolean monitoring;
42+
43+
@Override
44+
public void configure(MonitoringContext context) {
45+
if (monitoring) {
46+
PlainTextThreadDumpFormatter formatter = new PlainTextThreadDumpFormatter();
47+
context.addHttpEndpoint(
48+
"threaddump",
49+
new AbstractHandler() {
50+
@Override
51+
public void handle(
52+
String target,
53+
Request baseRequest,
54+
HttpServletRequest request,
55+
HttpServletResponse response)
56+
throws IOException {
57+
58+
ThreadInfo[] threadInfos =
59+
ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
60+
String content = formatter.format(threadInfos);
61+
62+
response.setStatus(HttpServletResponse.SC_OK);
63+
response.setContentLength(content.length());
64+
response.setContentType("text/plain");
65+
66+
response.getWriter().print(content);
67+
68+
baseRequest.setHandled(true);
69+
}
70+
});
71+
}
72+
}
73+
74+
// from Spring Boot's PlainTextThreadDumpFormatter
75+
private static class PlainTextThreadDumpFormatter {
76+
77+
String format(ThreadInfo[] threads) {
78+
StringWriter dump = new StringWriter();
79+
PrintWriter writer = new PrintWriter(dump);
80+
writePreamble(writer);
81+
for (ThreadInfo info : threads) {
82+
writeThread(writer, info);
83+
}
84+
return dump.toString();
85+
}
86+
87+
private void writePreamble(PrintWriter writer) {
88+
DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
89+
writer.println(dateFormat.format(LocalDateTime.now()));
90+
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
91+
writer.printf(
92+
"Full thread dump %s (%s %s):%n",
93+
runtime.getVmName(), runtime.getVmVersion(), System.getProperty("java.vm.info"));
94+
writer.println();
95+
}
96+
97+
private void writeThread(PrintWriter writer, ThreadInfo info) {
98+
writer.printf("\"%s\" - Thread t@%d%n", info.getThreadName(), info.getThreadId());
99+
writer.printf(" %s: %s%n", Thread.State.class.getCanonicalName(), info.getThreadState());
100+
writeStackTrace(writer, info, info.getLockedMonitors());
101+
writer.println();
102+
writeLockedOwnableSynchronizers(writer, info);
103+
writer.println();
104+
}
105+
106+
private void writeStackTrace(
107+
PrintWriter writer, ThreadInfo info, MonitorInfo[] lockedMonitors) {
108+
int depth = 0;
109+
for (StackTraceElement element : info.getStackTrace()) {
110+
writeStackTraceElement(
111+
writer, element, info, lockedMonitorsForDepth(lockedMonitors, depth), depth == 0);
112+
depth++;
113+
}
114+
}
115+
116+
private List<MonitorInfo> lockedMonitorsForDepth(MonitorInfo[] lockedMonitors, int depth) {
117+
return Stream.of(lockedMonitors)
118+
.filter((lockedMonitor) -> lockedMonitor.getLockedStackDepth() == depth)
119+
.collect(Collectors.toList());
120+
}
121+
122+
private void writeStackTraceElement(
123+
PrintWriter writer,
124+
StackTraceElement element,
125+
ThreadInfo info,
126+
List<MonitorInfo> lockedMonitors,
127+
boolean firstElement) {
128+
writer.printf("\tat %s%n", element.toString());
129+
LockInfo lockInfo = info.getLockInfo();
130+
if (firstElement && lockInfo != null) {
131+
if (element.getClassName().equals(Object.class.getName())
132+
&& element.getMethodName().equals("wait")) {
133+
writer.printf("\t- waiting on %s%n", format(lockInfo));
134+
} else {
135+
String lockOwner = info.getLockOwnerName();
136+
if (lockOwner != null) {
137+
writer.printf(
138+
"\t- waiting to lock %s owned by \"%s\" t@%d%n",
139+
format(lockInfo), lockOwner, info.getLockOwnerId());
140+
} else {
141+
writer.printf("\t- parking to wait for %s%n", format(lockInfo));
142+
}
143+
}
144+
}
145+
writeMonitors(writer, lockedMonitors);
146+
}
147+
148+
private String format(LockInfo lockInfo) {
149+
return String.format("<%x> (a %s)", lockInfo.getIdentityHashCode(), lockInfo.getClassName());
150+
}
151+
152+
private void writeMonitors(PrintWriter writer, List<MonitorInfo> lockedMonitorsAtCurrentDepth) {
153+
for (MonitorInfo lockedMonitor : lockedMonitorsAtCurrentDepth) {
154+
writer.printf("\t- locked %s%n", format(lockedMonitor));
155+
}
156+
}
157+
158+
private void writeLockedOwnableSynchronizers(PrintWriter writer, ThreadInfo info) {
159+
writer.println(" Locked ownable synchronizers:");
160+
LockInfo[] lockedSynchronizers = info.getLockedSynchronizers();
161+
if (lockedSynchronizers == null || lockedSynchronizers.length == 0) {
162+
writer.println("\t- None");
163+
} else {
164+
for (LockInfo lockedSynchronizer : lockedSynchronizers) {
165+
writer.printf("\t- Locked %s%n", format(lockedSynchronizer));
166+
}
167+
}
168+
}
169+
}
170+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.perf;
15+
16+
interface Monitoring {
17+
18+
void configure(MonitoringContext context);
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.perf;
15+
16+
import java.util.ArrayList;
17+
import java.util.LinkedHashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Map.Entry;
21+
import org.eclipse.jetty.server.Connector;
22+
import org.eclipse.jetty.server.Handler;
23+
import org.eclipse.jetty.server.Server;
24+
import org.eclipse.jetty.server.ServerConnector;
25+
import org.eclipse.jetty.server.handler.ContextHandler;
26+
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
27+
import org.eclipse.jetty.util.thread.QueuedThreadPool;
28+
29+
class MonitoringContext {
30+
31+
private final int monitoringPort;
32+
33+
private final Map<String, Handler> handlers = new LinkedHashMap<>();
34+
35+
private volatile Server server;
36+
37+
MonitoringContext(int monitoringPort) {
38+
this.monitoringPort = monitoringPort;
39+
}
40+
41+
void addHttpEndpoint(String path, Handler handler) {
42+
this.handlers.put(path, handler);
43+
}
44+
45+
void start() throws Exception {
46+
if (!handlers.isEmpty()) {
47+
QueuedThreadPool threadPool = new QueuedThreadPool();
48+
// difference between those 2 should be high enough to avoid a warning
49+
threadPool.setMinThreads(2);
50+
threadPool.setMaxThreads(12);
51+
server = new Server(threadPool);
52+
ServerConnector connector = new ServerConnector(server);
53+
connector.setPort(this.monitoringPort);
54+
server.setConnectors(new Connector[] {connector});
55+
56+
List<ContextHandler> contextHandlers = new ArrayList<>(handlers.size());
57+
for (Entry<String, Handler> entry : handlers.entrySet()) {
58+
String path = entry.getKey().startsWith("/") ? entry.getKey() : "/" + entry.getKey();
59+
Handler handler = entry.getValue();
60+
ContextHandler contextHandler = new ContextHandler();
61+
contextHandler.setContextPath(path);
62+
contextHandler.setHandler(handler);
63+
contextHandlers.add(contextHandler);
64+
}
65+
66+
ContextHandlerCollection contextHandler =
67+
new ContextHandlerCollection(contextHandlers.toArray(new ContextHandler[0]));
68+
server.setHandler(contextHandler);
69+
70+
server.setStopTimeout(1000);
71+
server.start();
72+
}
73+
}
74+
75+
void close() throws Exception {
76+
if (server != null) {
77+
server.stop();
78+
}
79+
}
80+
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.nio.charset.Charset;
5959
import java.time.Duration;
6060
import java.util.ArrayList;
61+
import java.util.Arrays;
6162
import java.util.Collection;
6263
import java.util.Collections;
6364
import java.util.HashMap;
@@ -319,8 +320,15 @@ public class StreamPerfTest implements Callable<Integer> {
319320
converter = Utils.SniServerNamesConverter.class)
320321
private List<SNIServerName> sniServerNames;
321322

323+
@CommandLine.Option(
324+
names = {"--monitoring-port", "-mp"},
325+
description = "port to launch HTTP monitoring on",
326+
defaultValue = "8080")
327+
private int monitoringPort;
328+
322329
private MetricsCollector metricsCollector;
323330
private PerformanceMetrics performanceMetrics;
331+
private List<Monitoring> monitorings;
324332

325333
private final PrintWriter err, out;
326334

@@ -349,10 +357,15 @@ public static void main(String[] args) throws IOException {
349357

350358
static int run(String[] args, PrintStream consoleOut, PrintStream consoleErr) {
351359
StreamPerfTest streamPerfTest = new StreamPerfTest(args, consoleOut, consoleErr);
352-
return new CommandLine(streamPerfTest)
353-
.setOut(streamPerfTest.out)
354-
.setErr(streamPerfTest.err)
355-
.execute(args);
360+
CommandLine commandLine =
361+
new CommandLine(streamPerfTest).setOut(streamPerfTest.out).setErr(streamPerfTest.err);
362+
363+
List<Monitoring> monitorings = Arrays.asList(new DebugEndpointMonitoring());
364+
365+
monitorings.forEach(m -> commandLine.addMixin(m.getClass().getSimpleName(), m));
366+
367+
streamPerfTest.monitorings(monitorings);
368+
return commandLine.execute(args);
356369
}
357370

358371
static void versionInformation(PrintStream out) {
@@ -464,6 +477,12 @@ public Integer call() throws Exception {
464477

465478
ShutdownService shutdownService = new ShutdownService();
466479

480+
MonitoringContext monitoringContext = new MonitoringContext(this.monitoringPort);
481+
this.monitorings.forEach(m -> m.configure(monitoringContext));
482+
monitoringContext.start();
483+
484+
shutdownService.wrap(closeStep("Closing monitoring context", () -> monitoringContext.close()));
485+
467486
Runtime.getRuntime().addShutdownHook(new Thread(() -> shutdownService.close()));
468487

469488
// FIXME add confirm latency
@@ -775,6 +794,10 @@ public String toString() {
775794
};
776795
}
777796

797+
public void monitorings(List<Monitoring> monitorings) {
798+
this.monitorings = monitorings;
799+
}
800+
778801
private String stream() {
779802
return streams.get(streamDispatching++ % streams.size());
780803
}

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ void consumeAndStore(BiConsumer<String, Client> streamCreator, TestInfo info) th
290290
}
291291

292292
@Test
293-
void storeOffsetAndThenAttachByTimestampShouldWork() throws InterruptedException {
293+
void storeOffsetAndThenAttachByTimestampShouldWork() throws Exception {
294294
// this test performs a timestamp-based index search within a segment with
295295
// a lot of non-user entries (chunks that contain tracking info, not messages)
296296
int messageCount = 50_000;

0 commit comments

Comments
 (0)