Skip to content

Add --consumer-latency #206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<guava.version>33.4.0-jre</guava.version>
<jgroups.version>5.4.1.Final</jgroups.version>
<jgroups-kubernetes.version>2.0.2.Final</jgroups-kubernetes.version>
<threeten-extra.version>1.8.0</threeten-extra.version>
<junit.jupiter.version>5.11.4</junit.jupiter.version>
<assertj.version>3.27.3</assertj.version>
<spotless.check.skip>true</spotless.check.skip>
Expand Down Expand Up @@ -161,6 +162,12 @@
<version>${jgroups-kubernetes.version}</version>
</dependency>

<dependency>
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
<version>${threeten-extra.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
47 changes: 41 additions & 6 deletions src/main/java/com/rabbitmq/stream/perf/Converters.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
import java.util.stream.IntStream;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import org.threeten.extra.AmountFormats;
import picocli.CommandLine;

final class Converters {

private static final CommandLine.ITypeConverter<Duration> DURATION_TYPE_CONVERTER =
new DurationTypeConverter();

private Converters() {}

static void typeConversionException(String message) {
Expand Down Expand Up @@ -290,17 +294,48 @@ static class DurationTypeConverter implements CommandLine.ITypeConverter<Duratio

@Override
public Duration convert(String value) {
Duration duration = null;
try {
Duration duration = Duration.parse(value);
if (duration.isNegative() || duration.isZero()) {
duration = AmountFormats.parseUnitBasedDuration(value);
} catch (DateTimeParseException e) {

}
if (duration == null) {
try {
duration = Duration.parse(value);
} catch (DateTimeParseException e) {
throw new CommandLine.TypeConversionException(
"'" + value + "' is not valid, it must be positive");
"'" + value + "' is not valid, valid example values: PT15M, PT10H");
}
return duration;
} catch (DateTimeParseException e) {
}
return duration;
}
}

static class PositiveDurationTypeConverter implements CommandLine.ITypeConverter<Duration> {

@Override
public Duration convert(String value) throws Exception {
Duration duration = DURATION_TYPE_CONVERTER.convert(value);
if (duration.isNegative() || duration.isZero()) {
throw new CommandLine.TypeConversionException(
"'" + value + "' is not valid, it must be positive");
}
return duration;
}
}

static class GreaterThanOrEqualToZeroDurationTypeConverter
implements CommandLine.ITypeConverter<Duration> {

@Override
public Duration convert(String value) throws Exception {
Duration duration = DURATION_TYPE_CONVERTER.convert(value);
if (duration.isNegative()) {
throw new CommandLine.TypeConversionException(
"'" + value + "' is not valid, valid example values: PT15M, PT10H");
"'" + value + "' is not valid, it must be greater than or equal to 0");
}
return duration;
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ public class StreamPerfTest implements Callable<Integer> {
@CommandLine.Option(
names = {"--max-age", "-ma"},
description =
"max age of segments using the ISO 8601 duration format, "
+ "e.g. PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
converter = Converters.DurationTypeConverter.class)
"max age of segments using Golang parseDuration syntax or the ISO 8601 duration format, "
+ "e.g. 10m30s for 10 minutes 30 seconds (d, w, y not supported) or "
+ "PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
converter = Converters.PositiveDurationTypeConverter.class)
private Duration maxAge;

@CommandLine.Option(
Expand Down Expand Up @@ -556,6 +557,15 @@ static class InstanceSyncOptions {
defaultValue = "true")
private boolean tcpNoDelay;

@CommandLine.Option(
names = {"--consumer-latency", "-L"},
description =
"consumer latency using Golang parseDuration syntax, "
+ "e.g. 5 ms (d, w, y are not supported)",
converter = Converters.GreaterThanOrEqualToZeroDurationTypeConverter.class,
defaultValue = "0")
private Duration consumerLatency;

private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List<Monitoring> monitorings;
Expand Down Expand Up @@ -1124,6 +1134,7 @@ public Integer call() throws Exception {
.builder();
}

Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency);
consumerBuilder =
consumerBuilder.messageHandler(
(context, message) -> {
Expand All @@ -1137,12 +1148,12 @@ public Integer call() throws Exception {
// tool
}
metrics.offset(context.offset());
latencyWorker.run();
});

consumerBuilder = maybeConfigureForFiltering(consumerBuilder);

Consumer consumer = consumerBuilder.build();
return consumer;
return consumerBuilder.build();
})
.collect(Collectors.toList()));

Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/rabbitmq/stream/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,4 +481,30 @@ static int filteringSubSetSize(int setSize) {
return setSize - 3;
}
}

static Runnable latencyWorker(Duration latency) {
if (latency.isZero()) {
return () -> {};
} else if (latency.toMillis() >= 1) {
long latencyInMs = latency.toMillis();
return () -> latencySleep(latencyInMs);
} else {
long latencyInNs = latency.toNanos();
return () -> latencyBusyWait(latencyInNs);
}
}

private static void latencySleep(long delayInMs) {
try {
Thread.sleep(delayInMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private static void latencyBusyWait(long delayInNs) {
long start = System.nanoTime();
while (System.nanoTime() - start < delayInNs)
;
}
}
18 changes: 18 additions & 0 deletions src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,24 @@ void sniServerNamesConverter() {
.contains(new SNIHostName("dummy"));
}

@ParameterizedTest
@CsvSource({"50ms,50", "0,0", "1s,1000", "10m30s,630000", "PT1M30S,90000"})
void durationTypeConverterOk(String value, long expectedInMs) {
Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
Duration duration = converter.convert(value);
assertThat(duration).isNotNull();
assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs));
}

@ParameterizedTest
@ValueSource(strings = {"1", "abc", "1.5"})
void durationTypeConverterKo(String value) {
Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
assertThatThrownBy(() -> converter.convert(value))
.isInstanceOf(CommandLine.TypeConversionException.class)
.hasMessageContaining("valid");
}

private static Tag tag(String key, String value) {
return Tag.of(key, value);
}
Expand Down