Skip to content

Commit 83d9e6e

Browse files
committed
Add Prometheus metrics on HTTP monitoring endpoint
Fixes #29
1 parent c48d6d1 commit 83d9e6e

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed

pom.xml

+13
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@
168168
<optional>true</optional>
169169
</dependency>
170170

171+
<dependency>
172+
<groupId>io.micrometer</groupId>
173+
<artifactId>micrometer-registry-prometheus</artifactId>
174+
<version>${micrometer.version}</version>
175+
<optional>true</optional>
176+
</dependency>
177+
171178
<!-- for rate limiter -->
172179
<dependency>
173180
<groupId>com.google.guava</groupId>
@@ -562,6 +569,12 @@
562569
<version>${micrometer.version}</version>
563570
</dependency>
564571

572+
<dependency>
573+
<groupId>io.micrometer</groupId>
574+
<artifactId>micrometer-registry-prometheus</artifactId>
575+
<version>${micrometer.version}</version>
576+
</dependency>
577+
565578
<!-- for rate limiter -->
566579
<dependency>
567580
<groupId>com.google.guava</groupId>

src/main/java/com/rabbitmq/stream/perf/MonitoringContext.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.perf;
1515

16+
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
1617
import java.util.ArrayList;
1718
import java.util.LinkedHashMap;
1819
import java.util.List;
@@ -29,13 +30,15 @@
2930
class MonitoringContext {
3031

3132
private final int monitoringPort;
33+
private final CompositeMeterRegistry meterRegistry;
3234

3335
private final Map<String, Handler> handlers = new LinkedHashMap<>();
3436

3537
private volatile Server server;
3638

37-
MonitoringContext(int monitoringPort) {
39+
MonitoringContext(int monitoringPort, CompositeMeterRegistry meterRegistry) {
3840
this.monitoringPort = monitoringPort;
41+
this.meterRegistry = meterRegistry;
3942
}
4043

4144
void addHttpEndpoint(String path, Handler handler) {
@@ -77,4 +80,8 @@ void close() throws Exception {
7780
server.stop();
7881
}
7982
}
83+
84+
CompositeMeterRegistry meterRegistry() {
85+
return meterRegistry;
86+
}
8087
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright (c) 2021 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream.perf;
15+
16+
import io.micrometer.prometheus.PrometheusConfig;
17+
import io.micrometer.prometheus.PrometheusMeterRegistry;
18+
import java.io.IOException;
19+
import javax.servlet.http.HttpServletRequest;
20+
import javax.servlet.http.HttpServletResponse;
21+
import org.eclipse.jetty.server.Request;
22+
import org.eclipse.jetty.server.handler.AbstractHandler;
23+
import picocli.CommandLine.Option;
24+
25+
class PrometheusEndpointMonitoring implements Monitoring {
26+
27+
@Option(
28+
names = {"--prometheus"},
29+
description = "Enable HTTP Prometheus metrics endpoint",
30+
defaultValue = "false")
31+
private boolean enabled;
32+
33+
private volatile PrometheusMeterRegistry registry;
34+
35+
@Override
36+
public void configure(MonitoringContext context) {
37+
if (enabled) {
38+
registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
39+
context.meterRegistry().add(registry);
40+
context.addHttpEndpoint(
41+
"metrics",
42+
new AbstractHandler() {
43+
@Override
44+
public void handle(
45+
String target,
46+
Request baseRequest,
47+
HttpServletRequest request,
48+
HttpServletResponse response)
49+
throws IOException {
50+
51+
String scraped = registry.scrape();
52+
53+
response.setStatus(HttpServletResponse.SC_OK);
54+
response.setContentLength(scraped.length());
55+
response.setContentType("text/plain");
56+
57+
response.getWriter().print(scraped);
58+
59+
baseRequest.setHandled(true);
60+
}
61+
});
62+
}
63+
}
64+
}

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ static int run(String[] args, PrintStream consoleOut, PrintStream consoleErr) {
360360
CommandLine commandLine =
361361
new CommandLine(streamPerfTest).setOut(streamPerfTest.out).setErr(streamPerfTest.err);
362362

363-
List<Monitoring> monitorings = Arrays.asList(new DebugEndpointMonitoring());
363+
List<Monitoring> monitorings =
364+
Arrays.asList(new DebugEndpointMonitoring(), new PrometheusEndpointMonitoring());
364365

365366
monitorings.forEach(m -> commandLine.addMixin(m.getClass().getSimpleName(), m));
366367

@@ -477,7 +478,7 @@ public Integer call() throws Exception {
477478

478479
ShutdownService shutdownService = new ShutdownService();
479480

480-
MonitoringContext monitoringContext = new MonitoringContext(this.monitoringPort);
481+
MonitoringContext monitoringContext = new MonitoringContext(this.monitoringPort, meterRegistry);
481482
this.monitorings.forEach(m -> m.configure(monitoringContext));
482483
monitoringContext.start();
483484

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,13 @@ void compressionWithoutSubEntriesShouldNotStart() throws Exception {
315315
@Test
316316
void monitoringShouldReturnValidEndpoint() throws Exception {
317317
int monitoringPort = randomNetworkPort();
318-
Future<?> run = run(builder().deleteStreams().monitoring(true).monitoringPort(monitoringPort));
318+
Future<?> run =
319+
run(
320+
builder()
321+
.deleteStreams()
322+
.monitoring(true)
323+
.monitoringPort(monitoringPort)
324+
.prometheus(true));
319325
waitUntilStreamExists(s);
320326
waitOneSecond();
321327

@@ -327,6 +333,13 @@ void monitoringShouldReturnValidEndpoint() throws Exception {
327333
&& response.body.contains("stream-perf-test-publishers-");
328334
});
329335

336+
waitAtMost(
337+
10,
338+
() -> {
339+
HttpResponse response = httpRequest("http://localhost:" + monitoringPort + "/metrics");
340+
return response.responseCode == 200
341+
&& response.body.contains("# HELP rabbitmq_stream_published_total");
342+
});
330343
run.cancel(true);
331344
waitRunEnds();
332345
}
@@ -509,6 +522,11 @@ ArgumentsBuilder monitoringPort(int port) {
509522
return this;
510523
}
511524

525+
ArgumentsBuilder prometheus(boolean prometheus) {
526+
arguments.put("prometheus", String.valueOf(prometheus));
527+
return this;
528+
}
529+
512530
String build() {
513531
return this.arguments.entrySet().stream()
514532
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

0 commit comments

Comments
 (0)