Skip to content

Commit df6704b

Browse files
committed
Add --metrics-command-line-arguments option
Fixes #225
1 parent adb771c commit df6704b

File tree

5 files changed

+109
-15
lines changed

5 files changed

+109
-15
lines changed

src/main/java/com/rabbitmq/stream/perf/DefaultPerformanceMetrics.java

+19-10
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class DefaultPerformanceMetrics implements PerformanceMetrics {
5555
private volatile long lastPublishedCount = 0;
5656
private volatile long lastConsumedCount = 0;
5757
private volatile long offset;
58+
private final String metricsSuffix;
5859

5960
DefaultPerformanceMetrics(
6061
CompositeMeterRegistry meterRegistry,
@@ -111,28 +112,36 @@ protected Double nullGaugeValue() {
111112
} else {
112113
this.confirmLatency = null;
113114
}
115+
// the metrics name contains the tags, if any,
116+
// so we extract the suffix to use it later when looking up the metrics
117+
String key = metricRegistry.getMeters().keySet().iterator().next();
118+
int index = key.indexOf(".");
119+
this.metricsSuffix = index == -1 ? "" : key.substring(index);
114120
}
115121

116122
private long getPublishedCount() {
117-
return this.metricRegistry.getMeters().get("rabbitmqStreamPublished").getCount();
123+
return this.metricRegistry
124+
.getMeters()
125+
.get("rabbitmqStreamPublished" + metricsSuffix)
126+
.getCount();
118127
}
119128

120129
private long getConsumedCount() {
121-
return this.metricRegistry.getMeters().get("rabbitmqStreamConsumed").getCount();
130+
return this.metricRegistry.getMeters().get("rabbitmqStreamConsumed" + metricsSuffix).getCount();
122131
}
123132

124133
@Override
125134
public void start(String description) throws Exception {
126135
long startTime = System.nanoTime();
127136

128-
String metricPublished = "rabbitmqStreamPublished";
129-
String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed";
130-
String metricConsumed = "rabbitmqStreamConsumed";
131-
String metricChunkSize = "rabbitmqStreamChunk_size";
132-
String metricLatency = "rabbitmqStreamLatency";
133-
String metricConfirmLatency = "rabbitmqStreamConfirm_latency";
134-
String metricWrittenBytes = "rabbitmqStreamWritten_bytes";
135-
String metricReadBytes = "rabbitmqStreamRead_bytes";
137+
String metricPublished = "rabbitmqStreamPublished" + metricsSuffix;
138+
String metricProducerConfirmed = "rabbitmqStreamProducer_confirmed" + metricsSuffix;
139+
String metricConsumed = "rabbitmqStreamConsumed" + metricsSuffix;
140+
String metricChunkSize = "rabbitmqStreamChunk_size" + metricsSuffix;
141+
String metricLatency = "rabbitmqStreamLatency" + metricsSuffix;
142+
String metricConfirmLatency = "rabbitmqStreamConfirm_latency" + metricsSuffix;
143+
String metricWrittenBytes = "rabbitmqStreamWritten_bytes" + metricsSuffix;
144+
String metricReadBytes = "rabbitmqStreamRead_bytes" + metricsSuffix;
136145

137146
Set<String> allMetrics =
138147
new HashSet<>(

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
5050
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
5151
import io.micrometer.core.instrument.Counter;
52+
import io.micrometer.core.instrument.Tag;
5253
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
5354
import io.netty.buffer.ByteBufAllocator;
5455
import io.netty.buffer.ByteBufAllocatorMetric;
@@ -401,6 +402,13 @@ public void setMaxSegmentSize(ByteCapacity in) {
401402
converter = Utils.GreaterThanOrEqualToZeroIntegerTypeConverter.class)
402403
private int time;
403404

405+
@CommandLine.Option(
406+
names = {"--metrics-tags", "-mt"},
407+
description = "metrics tags as key-value pairs separated by commas",
408+
defaultValue = "",
409+
converter = Utils.MetricsTagsTypeConverter.class)
410+
private Collection<Tag> metricsTags;
411+
404412
private MetricsCollector metricsCollector;
405413
private PerformanceMetrics performanceMetrics;
406414
private List<Monitoring> monitorings;
@@ -503,6 +511,7 @@ public Integer call() throws Exception {
503511
ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
504512

505513
CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
514+
meterRegistry.config().commonTags(this.metricsTags);
506515
String metricsPrefix = "rabbitmq.stream";
507516
this.metricsCollector = new MicrometerMetricsCollector(meterRegistry, metricsPrefix);
508517

src/main/java/com/rabbitmq/stream/perf/Utils.java

+24
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.rabbitmq.stream.StreamCreator.LeaderLocator;
2525
import com.rabbitmq.stream.compression.Compression;
2626
import com.sun.management.OperatingSystemMXBean;
27+
import io.micrometer.core.instrument.Tag;
2728
import java.lang.reflect.Constructor;
2829
import java.lang.reflect.Field;
2930
import java.lang.reflect.Method;
@@ -71,6 +72,7 @@
7172
import picocli.CommandLine.Model.CommandSpec;
7273
import picocli.CommandLine.Model.OptionSpec;
7374
import picocli.CommandLine.Option;
75+
import picocli.CommandLine.TypeConversionException;
7476

7577
class Utils {
7678

@@ -315,6 +317,28 @@ public ByteCapacity convert(String value) {
315317
}
316318
}
317319

320+
static class MetricsTagsTypeConverter implements CommandLine.ITypeConverter<Collection<Tag>> {
321+
322+
@Override
323+
public Collection<Tag> convert(String value) throws Exception {
324+
if (value == null || value.trim().isEmpty()) {
325+
return Collections.emptyList();
326+
} else {
327+
try {
328+
Collection<Tag> tags = new ArrayList<>();
329+
for (String tag : value.split(",")) {
330+
String[] keyValue = tag.split("=", 2);
331+
tags.add(Tag.of(keyValue[0], keyValue[1]));
332+
}
333+
return tags;
334+
} catch (Exception e) {
335+
throw new TypeConversionException(
336+
String.format("'%s' is not valid, use key/value pairs separated by commas"));
337+
}
338+
}
339+
}
340+
}
341+
318342
static class NameStrategyConverter
319343
implements CommandLine.ITypeConverter<BiFunction<String, Integer, String>> {
320344

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,31 @@ void monitoringShouldReturnValidEndpoint() throws Exception {
387387
waitRunEnds();
388388
}
389389

390+
@Test
391+
void metricsTagsShouldShowUpInHttpEndpoint() throws Exception {
392+
int monitoringPort = randomNetworkPort();
393+
Future<?> run =
394+
run(
395+
builder()
396+
.deleteStreams()
397+
.monitoring()
398+
.monitoringPort(monitoringPort)
399+
.prometheus()
400+
.metricsTags("env=performance,datacenter=eu"));
401+
waitUntilStreamExists(s);
402+
waitOneSecond();
403+
404+
waitAtMost(
405+
10,
406+
() -> {
407+
HttpResponse response = httpRequest("http://localhost:" + monitoringPort + "/metrics");
408+
return response.responseCode == 200
409+
&& response.body.contains("{datacenter=\"eu\",env=\"performance\",}");
410+
});
411+
run.cancel(true);
412+
waitRunEnds();
413+
}
414+
390415
@Test
391416
void publishConfirmLatencyShouldBeIncludedWhenOptionIsEnabled() throws Exception {
392417
Future<?> run = run(builder().confirmLatency());
@@ -640,6 +665,11 @@ ArgumentsBuilder prometheus() {
640665
return this;
641666
}
642667

668+
ArgumentsBuilder metricsTags(String tags) {
669+
arguments.put("metrics-tags", tags);
670+
return this;
671+
}
672+
643673
ArgumentsBuilder superStreams() {
644674
arguments.put("super-streams", "");
645675
return this;

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

+27-5
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import com.rabbitmq.stream.OffsetSpecification;
2424
import com.rabbitmq.stream.compression.Compression;
2525
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
26+
import com.rabbitmq.stream.perf.Utils.MetricsTagsTypeConverter;
2627
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
2728
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
2829
import com.rabbitmq.stream.perf.Utils.RangeTypeConverter;
2930
import com.rabbitmq.stream.perf.Utils.SniServerNamesConverter;
31+
import io.micrometer.core.instrument.Tag;
3032
import java.util.Arrays;
3133
import java.util.Collections;
3234
import java.util.HashMap;
@@ -78,7 +80,7 @@ static Stream<Arguments> offsetSpecificationTypeConverterOkArguments() {
7880
static Stream<Arguments> streams() {
7981
Stream<Arguments> arguments =
8082
Stream.of(
81-
of("1", "stream", Arrays.asList("stream")),
83+
of("1", "stream", Collections.singletonList("stream")),
8284
of("5", "stream", IntStream.range(1, 6).mapToObj(i -> "stream-" + i).collect(toList())),
8385
of(
8486
"10",
@@ -111,6 +113,10 @@ static Stream<Arguments> streams() {
111113
});
112114
}
113115

116+
private static Tag tag(String key, String value) {
117+
return Tag.of(key, value);
118+
}
119+
114120
@ParameterizedTest
115121
@MethodSource("offsetSpecificationTypeConverterOkArguments")
116122
void offsetSpecificationTypeConverterOk(String value, OffsetSpecification expected)
@@ -191,6 +197,21 @@ void sniServerNamesConverter() throws Exception {
191197
.contains(new SNIHostName("dummy"));
192198
}
193199

200+
@Test
201+
void metricsTagsConverter() throws Exception {
202+
MetricsTagsTypeConverter converter = new MetricsTagsTypeConverter();
203+
assertThat(converter.convert(null)).isNotNull().isEmpty();
204+
assertThat(converter.convert("")).isNotNull().isEmpty();
205+
assertThat(converter.convert(" ")).isNotNull().isEmpty();
206+
assertThat(converter.convert("env=performance,datacenter=eu"))
207+
.hasSize(2)
208+
.contains(tag("env", "performance"))
209+
.contains(tag("datacenter", "eu"));
210+
assertThat(converter.convert("args=--queue-args \"x-max-length=100000\""))
211+
.hasSize(1)
212+
.contains(tag("args", "--queue-args \"x-max-length=100000\""));
213+
}
214+
194215
@Test
195216
void writeReadLongInByteArray() {
196217
byte[] array = new byte[8];
@@ -327,22 +348,23 @@ void superStreamPartitionsTest() {
327348

328349
@Command(name = "test-command")
329350
static class TestCommand {
351+
330352
@Option(
331353
names = {"aaa", "a"},
332354
defaultValue = "10")
333-
private int a = 10;
355+
private final int a = 10;
334356

335357
@Option(names = "b", defaultValue = "false")
336-
private boolean b = false;
358+
private final boolean b = false;
337359

338360
@Option(names = "c", defaultValue = "false")
339-
private boolean c = false;
361+
private final boolean c = false;
340362

341363
@CommandLine.Option(
342364
names = {"offset"},
343365
defaultValue = "next",
344366
converter = Utils.OffsetSpecificationTypeConverter.class)
345-
private OffsetSpecification offsetSpecification = OffsetSpecification.next();
367+
private final OffsetSpecification offsetSpecification = OffsetSpecification.next();
346368

347369
public TestCommand() {}
348370
}

0 commit comments

Comments
 (0)