Skip to content

Commit cec4537

Browse files
committed
Add --metrics-command-line-arguments option
Fixes #227
1 parent 74915e0 commit cec4537

File tree

3 files changed

+155
-85
lines changed

3 files changed

+155
-85
lines changed

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+19
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
5050
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
5151
import io.micrometer.core.instrument.Counter;
52+
import io.micrometer.core.instrument.Gauge;
5253
import io.micrometer.core.instrument.Tag;
54+
import io.micrometer.core.instrument.Tags;
5355
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
5456
import io.netty.buffer.ByteBufAllocator;
5557
import io.netty.buffer.ByteBufAllocatorMetric;
@@ -409,6 +411,12 @@ public void setMaxSegmentSize(ByteCapacity in) {
409411
converter = Utils.MetricsTagsTypeConverter.class)
410412
private Collection<Tag> metricsTags;
411413

414+
@CommandLine.Option(
415+
names = {"--metrics-command-line-arguments", "-mcla"},
416+
description = "add fixed metrics with command line arguments label",
417+
defaultValue = "false")
418+
private boolean metricsCommandLineArguments;
419+
412420
private MetricsCollector metricsCollector;
413421
private PerformanceMetrics performanceMetrics;
414422
private List<Monitoring> monitorings;
@@ -513,6 +521,17 @@ public Integer call() throws Exception {
513521
CompositeMeterRegistry meterRegistry = new CompositeMeterRegistry();
514522
meterRegistry.config().commonTags(this.metricsTags);
515523
String metricsPrefix = "rabbitmq.stream";
524+
if (this.metricsCommandLineArguments) {
525+
Tags tags;
526+
if (this.arguments == null || this.arguments.length == 0) {
527+
tags = Tags.of("command_line", "");
528+
} else {
529+
tags = Tags.of("command_line", Utils.commandLineMetrics(this.arguments));
530+
}
531+
Gauge.builder(metricsPrefix + ".args", () -> Integer.valueOf(1))
532+
.tags(tags)
533+
.register(meterRegistry);
534+
}
516535
this.metricsCollector = new PerformanceMicrometerMetricsCollector(meterRegistry, metricsPrefix);
517536

518537
Counter producerConfirm = meterRegistry.counter(metricsPrefix + ".producer_confirmed");

src/main/java/com/rabbitmq/stream/perf/Utils.java

+114-85
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Collections;
4949
import java.util.Comparator;
5050
import java.util.HashMap;
51+
import java.util.Iterator;
5152
import java.util.List;
5253
import java.util.Locale;
5354
import java.util.Map;
@@ -308,6 +309,116 @@ static int downSamplingDivisor(int rate) {
308309
return divisor;
309310
}
310311

312+
static void declareSuperStreamExchangeAndBindings(
313+
Channel channel, String superStream, List<String> streams) throws Exception {
314+
channel.exchangeDeclare(
315+
superStream,
316+
BuiltinExchangeType.DIRECT,
317+
true,
318+
false,
319+
Collections.singletonMap("x-super-stream", true));
320+
321+
for (int i = 0; i < streams.size(); i++) {
322+
channel.queueBind(
323+
streams.get(i),
324+
superStream,
325+
String.valueOf(i),
326+
Collections.singletonMap("x-stream-partition-order", i));
327+
}
328+
}
329+
330+
static void deleteSuperStreamExchange(Channel channel, String superStream) throws Exception {
331+
channel.exchangeDelete(superStream);
332+
}
333+
334+
static List<String> superStreamPartitions(String superStream, int partitionCount) {
335+
int digits = String.valueOf(partitionCount - 1).length();
336+
String format = superStream + "-%0" + digits + "d";
337+
return IntStream.range(0, partitionCount)
338+
.mapToObj(i -> String.format(format, i))
339+
.collect(Collectors.toList());
340+
}
341+
342+
static Connection amqpConnection(
343+
String amqpUri, List<String> streamUris, boolean isTls, List<SNIServerName> sniServerNames)
344+
throws Exception {
345+
ConnectionFactory connectionFactory = new ConnectionFactory();
346+
if (amqpUri == null || amqpUri.trim().isEmpty()) {
347+
String streamUriString = streamUris.get(0);
348+
if (isTls) {
349+
streamUriString = streamUriString.replaceFirst("rabbitmq-stream\\+tls", "amqps");
350+
} else {
351+
streamUriString = streamUriString.replaceFirst("rabbitmq-stream", "amqp");
352+
}
353+
URI streamUri = new URI(streamUriString);
354+
int streamPort = streamUri.getPort();
355+
if (streamPort != -1) {
356+
int defaultAmqpPort =
357+
isTls
358+
? ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
359+
: ConnectionFactory.DEFAULT_AMQP_PORT;
360+
streamUriString = streamUriString.replaceFirst(":" + streamPort, ":" + defaultAmqpPort);
361+
}
362+
connectionFactory.setUri(streamUriString);
363+
} else {
364+
connectionFactory.setUri(amqpUri);
365+
}
366+
if (isTls) {
367+
SSLContext sslContext = SSLContext.getInstance("TLS");
368+
sslContext.init(
369+
new KeyManager[] {},
370+
new TrustManager[] {TRUST_EVERYTHING_TRUST_MANAGER},
371+
new SecureRandom());
372+
connectionFactory.useSslProtocol(sslContext);
373+
if (!sniServerNames.isEmpty()) {
374+
SocketConfigurator socketConfigurator =
375+
socket -> {
376+
if (socket instanceof SSLSocket) {
377+
SSLSocket sslSocket = (SSLSocket) socket;
378+
SSLParameters sslParameters =
379+
sslSocket.getSSLParameters() == null
380+
? new SSLParameters()
381+
: sslSocket.getSSLParameters();
382+
sslParameters.setServerNames(sniServerNames);
383+
sslSocket.setSSLParameters(sslParameters);
384+
} else {
385+
LOGGER.warn("SNI parameter set on a non-TLS connection");
386+
}
387+
};
388+
connectionFactory.setSocketConfigurator(
389+
SocketConfigurators.defaultConfigurator().andThen(socketConfigurator));
390+
}
391+
}
392+
return connectionFactory.newConnection("stream-perf-test-amqp-connection");
393+
}
394+
395+
static String commandLineMetrics(String[] args) {
396+
Map<String, Boolean> filteredOptions = new HashMap<>();
397+
filteredOptions.put("--uris", true);
398+
filteredOptions.put("-u", true);
399+
filteredOptions.put("--prometheus", false);
400+
filteredOptions.put("--amqp-uri", true);
401+
filteredOptions.put("--au", true);
402+
filteredOptions.put("--metrics-command-line-arguments", false);
403+
filteredOptions.put("-mcla", false);
404+
filteredOptions.put("--metrics-tags", true);
405+
filteredOptions.put("-mt", true);
406+
407+
Collection<String> filtered = new ArrayList<>();
408+
Iterator<String> iterator = Arrays.stream(args).iterator();
409+
while (iterator.hasNext()) {
410+
String option = iterator.next();
411+
if (filteredOptions.containsKey(option)) {
412+
if (filteredOptions.get(option)) {
413+
iterator.next();
414+
}
415+
} else {
416+
filtered.add(option);
417+
}
418+
}
419+
return String.join(" ", filtered);
420+
}
421+
311422
static class ByteCapacityTypeConverter implements CommandLine.ITypeConverter<ByteCapacity> {
312423

313424
@Override
@@ -324,7 +435,7 @@ public ByteCapacity convert(String value) {
324435
static class MetricsTagsTypeConverter implements CommandLine.ITypeConverter<Collection<Tag>> {
325436

326437
@Override
327-
public Collection<Tag> convert(String value) throws Exception {
438+
public Collection<Tag> convert(String value) {
328439
if (value == null || value.trim().isEmpty()) {
329440
return Collections.emptyList();
330441
} else {
@@ -359,7 +470,7 @@ public BiFunction<String, Integer, String> convert(String input) {
359470
static class SniServerNamesConverter implements ITypeConverter<List<SNIServerName>> {
360471

361472
@Override
362-
public List<SNIServerName> convert(String value) throws Exception {
473+
public List<SNIServerName> convert(String value) {
363474
if (value == null || value.trim().isEmpty()) {
364475
return Collections.emptyList();
365476
} else {
@@ -626,6 +737,7 @@ public Thread newThread(Runnable r) {
626737
}
627738

628739
private static class TrustEverythingTrustManager implements X509TrustManager {
740+
629741
@Override
630742
public void checkClientTrusted(X509Certificate[] chain, String authType) {}
631743

@@ -652,89 +764,6 @@ public String apply(String stream, Integer index) {
652764
}
653765
}
654766

655-
static void declareSuperStreamExchangeAndBindings(
656-
Channel channel, String superStream, List<String> streams) throws Exception {
657-
channel.exchangeDeclare(
658-
superStream,
659-
BuiltinExchangeType.DIRECT,
660-
true,
661-
false,
662-
Collections.singletonMap("x-super-stream", true));
663-
664-
for (int i = 0; i < streams.size(); i++) {
665-
channel.queueBind(
666-
streams.get(i),
667-
superStream,
668-
String.valueOf(i),
669-
Collections.singletonMap("x-stream-partition-order", i));
670-
}
671-
}
672-
673-
static void deleteSuperStreamExchange(Channel channel, String superStream) throws Exception {
674-
channel.exchangeDelete(superStream);
675-
}
676-
677-
static List<String> superStreamPartitions(String superStream, int partitionCount) {
678-
int digits = String.valueOf(partitionCount - 1).length();
679-
String format = superStream + "-%0" + digits + "d";
680-
return IntStream.range(0, partitionCount)
681-
.mapToObj(i -> String.format(format, i))
682-
.collect(Collectors.toList());
683-
}
684-
685-
static Connection amqpConnection(
686-
String amqpUri, List<String> streamUris, boolean isTls, List<SNIServerName> sniServerNames)
687-
throws Exception {
688-
ConnectionFactory connectionFactory = new ConnectionFactory();
689-
if (amqpUri == null || amqpUri.trim().isEmpty()) {
690-
String streamUriString = streamUris.get(0);
691-
if (isTls) {
692-
streamUriString = streamUriString.replaceFirst("rabbitmq-stream\\+tls", "amqps");
693-
} else {
694-
streamUriString = streamUriString.replaceFirst("rabbitmq-stream", "amqp");
695-
}
696-
URI streamUri = new URI(streamUriString);
697-
int streamPort = streamUri.getPort();
698-
if (streamPort != -1) {
699-
int defaultAmqpPort =
700-
isTls
701-
? ConnectionFactory.DEFAULT_AMQP_OVER_SSL_PORT
702-
: ConnectionFactory.DEFAULT_AMQP_PORT;
703-
streamUriString = streamUriString.replaceFirst(":" + streamPort, ":" + defaultAmqpPort);
704-
}
705-
connectionFactory.setUri(streamUriString);
706-
} else {
707-
connectionFactory.setUri(amqpUri);
708-
}
709-
if (isTls) {
710-
SSLContext sslContext = SSLContext.getInstance("TLS");
711-
sslContext.init(
712-
new KeyManager[] {},
713-
new TrustManager[] {TRUST_EVERYTHING_TRUST_MANAGER},
714-
new SecureRandom());
715-
connectionFactory.useSslProtocol(sslContext);
716-
if (!sniServerNames.isEmpty()) {
717-
SocketConfigurator socketConfigurator =
718-
socket -> {
719-
if (socket instanceof SSLSocket) {
720-
SSLSocket sslSocket = (SSLSocket) socket;
721-
SSLParameters sslParameters =
722-
sslSocket.getSSLParameters() == null
723-
? new SSLParameters()
724-
: sslSocket.getSSLParameters();
725-
sslParameters.setServerNames(sniServerNames);
726-
sslSocket.setSSLParameters(sslParameters);
727-
} else {
728-
LOGGER.warn("SNI parameter set on a non-TLS connection");
729-
}
730-
};
731-
connectionFactory.setSocketConfigurator(
732-
SocketConfigurators.defaultConfigurator().andThen(socketConfigurator));
733-
}
734-
}
735-
return connectionFactory.newConnection("stream-perf-test-amqp-connection");
736-
}
737-
738767
static class PerformanceMicrometerMetricsCollector extends MicrometerMetricsCollector {
739768

740769
public PerformanceMicrometerMetricsCollector(MeterRegistry registry, String prefix) {

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.perf;
1515

16+
import static com.rabbitmq.stream.perf.Utils.commandLineMetrics;
1617
import static com.rabbitmq.stream.perf.Utils.superStreamPartitions;
1718
import static java.lang.String.format;
1819
import static java.util.stream.Collectors.toList;
@@ -346,6 +347,27 @@ void superStreamPartitionsTest() {
346347
.contains("stream-1-00", "stream-1-01", "stream-1-10", "stream-1-19");
347348
}
348349

350+
@Test
351+
void commandLineMetricsTest() {
352+
assertThat(
353+
commandLineMetrics(
354+
("--uris rabbitmq-stream://default_user_kdId_cNrxfdolc5V7WJ:K8IYrRjh1NGqdVsaxfFa-r0KR1vGuPHB@cqv2 "
355+
+ "--prometheus "
356+
+ "--metrics-command-line-arguments "
357+
+ "-mt rabbitmq_cluster=cqv2,workload_name=test-new "
358+
+ "-x 1 -y 2")
359+
.split(" ")))
360+
.isEqualTo("-x 1 -y 2");
361+
assertThat(
362+
commandLineMetrics(
363+
("-u amqp://default_user_kdId_cNrxfdolc5V7WJ:K8IYrRjh1NGqdVsaxfFa-r0KR1vGuPHB@cqv2 "
364+
+ "-mcla "
365+
+ "-mt rabbitmq_cluster=cqv2,workload_name=test-new "
366+
+ "-x 1 -y 2")
367+
.split(" ")))
368+
.isEqualTo("-x 1 -y 2");
369+
}
370+
349371
@Command(name = "test-command")
350372
static class TestCommand {
351373

0 commit comments

Comments
 (0)