From 8de0827d0927cdda9d03a5662d11198ecab36fc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 29 Jan 2025 15:33:00 +0100 Subject: [PATCH 1/2] Add --consumer-latency --- .../com/rabbitmq/stream/perf/Converters.java | 17 ++++++++++++ .../rabbitmq/stream/perf/StreamPerfTest.java | 12 +++++++-- .../java/com/rabbitmq/stream/perf/Utils.java | 26 +++++++++++++++++++ .../rabbitmq/stream/perf/ConvertersTest.java | 20 ++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/perf/Converters.java b/src/main/java/com/rabbitmq/stream/perf/Converters.java index ce815b8..84ab4e9 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Converters.java +++ b/src/main/java/com/rabbitmq/stream/perf/Converters.java @@ -304,6 +304,23 @@ public Duration convert(String value) { } } + static class MicroSecondsToDurationTypeConverter implements CommandLine.ITypeConverter { + + @Override + public Duration convert(String value) { + try { + Duration duration = Duration.ofNanos(Long.parseLong(value) * 1_000); + if (duration.isNegative()) { + throw new CommandLine.TypeConversionException( + "'" + value + "' is not valid, it must be greater than or equal to 0"); + } + return duration; + } catch (NumberFormatException e) { + throw new CommandLine.TypeConversionException("'" + value + "' is not a valid number"); + } + } + } + static class LeaderLocatorTypeConverter implements CommandLine.ITypeConverter { diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index eaacaad..d770ba4 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -556,6 +556,13 @@ static class InstanceSyncOptions { defaultValue = "true") private boolean tcpNoDelay; + @CommandLine.Option( + names = {"--consumer-latency", "-L"}, + description = "consumer latency in microseconds", + converter = Converters.MicroSecondsToDurationTypeConverter.class, + defaultValue = "0") + private Duration consumerLatency; + private MetricsCollector metricsCollector; private PerformanceMetrics performanceMetrics; private List monitorings; @@ -1124,6 +1131,7 @@ public Integer call() throws Exception { .builder(); } + Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency); consumerBuilder = consumerBuilder.messageHandler( (context, message) -> { @@ -1137,12 +1145,12 @@ public Integer call() throws Exception { // tool } metrics.offset(context.offset()); + latencyWorker.run(); }); consumerBuilder = maybeConfigureForFiltering(consumerBuilder); - Consumer consumer = consumerBuilder.build(); - return consumer; + return consumerBuilder.build(); }) .collect(Collectors.toList())); diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java index a0fe3c5..683492e 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Utils.java +++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java @@ -481,4 +481,30 @@ static int filteringSubSetSize(int setSize) { return setSize - 3; } } + + static Runnable latencyWorker(Duration latency) { + if (latency.isZero()) { + return () -> {}; + } else if (latency.toMillis() >= 1) { + long latencyInMs = latency.toMillis(); + return () -> latencySleep(latencyInMs); + } else { + long latencyInNs = latency.toNanos(); + return () -> latencyBusyWait(latencyInNs); + } + } + + private static void latencySleep(long delayInMs) { + try { + Thread.sleep(delayInMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private static void latencyBusyWait(long delayInNs) { + long start = System.nanoTime(); + while (System.nanoTime() - start < delayInNs) + ; + } } diff --git a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java index 6f312b5..dd21fbf 100644 --- a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java @@ -207,6 +207,26 @@ void sniServerNamesConverter() { .contains(new SNIHostName("dummy")); } + @ParameterizedTest + @CsvSource({"5000,5", "0,0", "1000,1"}) + void microSecondsToDurationTypeConverterOk(String value, long expectedInMs) { + Converters.MicroSecondsToDurationTypeConverter converter = + new Converters.MicroSecondsToDurationTypeConverter(); + Duration duration = converter.convert(value); + assertThat(duration).isNotNull(); + assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs)); + } + + @ParameterizedTest + @ValueSource(strings = {"-1000000", "abc", "1.5"}) + void microSecondsToDurationTypeConverterKo(String value) { + Converters.MicroSecondsToDurationTypeConverter converter = + new Converters.MicroSecondsToDurationTypeConverter(); + assertThatThrownBy(() -> converter.convert(value)) + .isInstanceOf(CommandLine.TypeConversionException.class) + .hasMessageContaining("valid"); + } + private static Tag tag(String key, String value) { return Tag.of(key, value); } From 7c49e03d4ce13a03fa0c85449e5bb64526e1c49a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 29 Jan 2025 16:28:18 +0100 Subject: [PATCH 2/2] Support Golang parseDuration syntax for --max-age and --consumer-latency --- pom.xml | 7 +++ .../com/rabbitmq/stream/perf/Converters.java | 52 +++++++++++++------ .../rabbitmq/stream/perf/StreamPerfTest.java | 13 +++-- .../rabbitmq/stream/perf/ConvertersTest.java | 14 +++-- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/pom.xml b/pom.xml index 763c359..693b7f4 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ 33.4.0-jre 5.4.1.Final 2.0.2.Final + 1.8.0 5.11.4 3.27.3 true @@ -161,6 +162,12 @@ ${jgroups-kubernetes.version} + + org.threeten + threeten-extra + ${threeten-extra.version} + + org.junit.jupiter junit-jupiter-engine diff --git a/src/main/java/com/rabbitmq/stream/perf/Converters.java b/src/main/java/com/rabbitmq/stream/perf/Converters.java index 84ab4e9..b6ef0db 100644 --- a/src/main/java/com/rabbitmq/stream/perf/Converters.java +++ b/src/main/java/com/rabbitmq/stream/perf/Converters.java @@ -35,10 +35,14 @@ import java.util.stream.IntStream; import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIServerName; +import org.threeten.extra.AmountFormats; import picocli.CommandLine; final class Converters { + private static final CommandLine.ITypeConverter DURATION_TYPE_CONVERTER = + new DurationTypeConverter(); + private Converters() {} static void typeConversionException(String message) { @@ -290,34 +294,48 @@ static class DurationTypeConverter implements CommandLine.ITypeConverter { + + @Override + public Duration convert(String value) throws Exception { + Duration duration = DURATION_TYPE_CONVERTER.convert(value); + if (duration.isNegative() || duration.isZero()) { throw new CommandLine.TypeConversionException( - "'" + value + "' is not valid, valid example values: PT15M, PT10H"); + "'" + value + "' is not valid, it must be positive"); } + return duration; } } - static class MicroSecondsToDurationTypeConverter implements CommandLine.ITypeConverter { + static class GreaterThanOrEqualToZeroDurationTypeConverter + implements CommandLine.ITypeConverter { @Override - public Duration convert(String value) { - try { - Duration duration = Duration.ofNanos(Long.parseLong(value) * 1_000); - if (duration.isNegative()) { - throw new CommandLine.TypeConversionException( - "'" + value + "' is not valid, it must be greater than or equal to 0"); - } - return duration; - } catch (NumberFormatException e) { - throw new CommandLine.TypeConversionException("'" + value + "' is not a valid number"); + public Duration convert(String value) throws Exception { + Duration duration = DURATION_TYPE_CONVERTER.convert(value); + if (duration.isNegative()) { + throw new CommandLine.TypeConversionException( + "'" + value + "' is not valid, it must be greater than or equal to 0"); } + return duration; } } diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java index d770ba4..d9f78f5 100644 --- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java +++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java @@ -235,9 +235,10 @@ public class StreamPerfTest implements Callable { @CommandLine.Option( names = {"--max-age", "-ma"}, description = - "max age of segments using the ISO 8601 duration format, " - + "e.g. PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.", - converter = Converters.DurationTypeConverter.class) + "max age of segments using Golang parseDuration syntax or the ISO 8601 duration format, " + + "e.g. 10m30s for 10 minutes 30 seconds (d, w, y not supported) or " + + "PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.", + converter = Converters.PositiveDurationTypeConverter.class) private Duration maxAge; @CommandLine.Option( @@ -558,8 +559,10 @@ static class InstanceSyncOptions { @CommandLine.Option( names = {"--consumer-latency", "-L"}, - description = "consumer latency in microseconds", - converter = Converters.MicroSecondsToDurationTypeConverter.class, + description = + "consumer latency using Golang parseDuration syntax, " + + "e.g. 5 ms (d, w, y are not supported)", + converter = Converters.GreaterThanOrEqualToZeroDurationTypeConverter.class, defaultValue = "0") private Duration consumerLatency; diff --git a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java index dd21fbf..5f41363 100644 --- a/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java +++ b/src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java @@ -208,20 +208,18 @@ void sniServerNamesConverter() { } @ParameterizedTest - @CsvSource({"5000,5", "0,0", "1000,1"}) - void microSecondsToDurationTypeConverterOk(String value, long expectedInMs) { - Converters.MicroSecondsToDurationTypeConverter converter = - new Converters.MicroSecondsToDurationTypeConverter(); + @CsvSource({"50ms,50", "0,0", "1s,1000", "10m30s,630000", "PT1M30S,90000"}) + void durationTypeConverterOk(String value, long expectedInMs) { + Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter(); Duration duration = converter.convert(value); assertThat(duration).isNotNull(); assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs)); } @ParameterizedTest - @ValueSource(strings = {"-1000000", "abc", "1.5"}) - void microSecondsToDurationTypeConverterKo(String value) { - Converters.MicroSecondsToDurationTypeConverter converter = - new Converters.MicroSecondsToDurationTypeConverter(); + @ValueSource(strings = {"1", "abc", "1.5"}) + void durationTypeConverterKo(String value) { + Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter(); assertThatThrownBy(() -> converter.convert(value)) .isInstanceOf(CommandLine.TypeConversionException.class) .hasMessageContaining("valid");