Skip to content

Commit deb24b9

Browse files
authored
Merge pull request #206 from rabbitmq/consumer-latency-option
Add --consumer-latency
2 parents 51acb81 + 7c49e03 commit deb24b9

File tree

5 files changed

+108
-11
lines changed

5 files changed

+108
-11
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
<guava.version>33.4.0-jre</guava.version>
5656
<jgroups.version>5.4.1.Final</jgroups.version>
5757
<jgroups-kubernetes.version>2.0.2.Final</jgroups-kubernetes.version>
58+
<threeten-extra.version>1.8.0</threeten-extra.version>
5859
<junit.jupiter.version>5.11.4</junit.jupiter.version>
5960
<assertj.version>3.27.3</assertj.version>
6061
<spotless.check.skip>true</spotless.check.skip>
@@ -161,6 +162,12 @@
161162
<version>${jgroups-kubernetes.version}</version>
162163
</dependency>
163164

165+
<dependency>
166+
<groupId>org.threeten</groupId>
167+
<artifactId>threeten-extra</artifactId>
168+
<version>${threeten-extra.version}</version>
169+
</dependency>
170+
164171
<dependency>
165172
<groupId>org.junit.jupiter</groupId>
166173
<artifactId>junit-jupiter-engine</artifactId>

src/main/java/com/rabbitmq/stream/perf/Converters.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,14 @@
3535
import java.util.stream.IntStream;
3636
import javax.net.ssl.SNIHostName;
3737
import javax.net.ssl.SNIServerName;
38+
import org.threeten.extra.AmountFormats;
3839
import picocli.CommandLine;
3940

4041
final class Converters {
4142

43+
private static final CommandLine.ITypeConverter<Duration> DURATION_TYPE_CONVERTER =
44+
new DurationTypeConverter();
45+
4246
private Converters() {}
4347

4448
static void typeConversionException(String message) {
@@ -290,17 +294,48 @@ static class DurationTypeConverter implements CommandLine.ITypeConverter<Duratio
290294

291295
@Override
292296
public Duration convert(String value) {
297+
Duration duration = null;
293298
try {
294-
Duration duration = Duration.parse(value);
295-
if (duration.isNegative() || duration.isZero()) {
299+
duration = AmountFormats.parseUnitBasedDuration(value);
300+
} catch (DateTimeParseException e) {
301+
302+
}
303+
if (duration == null) {
304+
try {
305+
duration = Duration.parse(value);
306+
} catch (DateTimeParseException e) {
296307
throw new CommandLine.TypeConversionException(
297-
"'" + value + "' is not valid, it must be positive");
308+
"'" + value + "' is not valid, valid example values: PT15M, PT10H");
298309
}
299-
return duration;
300-
} catch (DateTimeParseException e) {
310+
}
311+
return duration;
312+
}
313+
}
314+
315+
static class PositiveDurationTypeConverter implements CommandLine.ITypeConverter<Duration> {
316+
317+
@Override
318+
public Duration convert(String value) throws Exception {
319+
Duration duration = DURATION_TYPE_CONVERTER.convert(value);
320+
if (duration.isNegative() || duration.isZero()) {
321+
throw new CommandLine.TypeConversionException(
322+
"'" + value + "' is not valid, it must be positive");
323+
}
324+
return duration;
325+
}
326+
}
327+
328+
static class GreaterThanOrEqualToZeroDurationTypeConverter
329+
implements CommandLine.ITypeConverter<Duration> {
330+
331+
@Override
332+
public Duration convert(String value) throws Exception {
333+
Duration duration = DURATION_TYPE_CONVERTER.convert(value);
334+
if (duration.isNegative()) {
301335
throw new CommandLine.TypeConversionException(
302-
"'" + value + "' is not valid, valid example values: PT15M, PT10H");
336+
"'" + value + "' is not valid, it must be greater than or equal to 0");
303337
}
338+
return duration;
304339
}
305340
}
306341

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,10 @@ public class StreamPerfTest implements Callable<Integer> {
235235
@CommandLine.Option(
236236
names = {"--max-age", "-ma"},
237237
description =
238-
"max age of segments using the ISO 8601 duration format, "
239-
+ "e.g. PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
240-
converter = Converters.DurationTypeConverter.class)
238+
"max age of segments using Golang parseDuration syntax or the ISO 8601 duration format, "
239+
+ "e.g. 10m30s for 10 minutes 30 seconds (d, w, y not supported) or "
240+
+ "PT10M30S for 10 minutes 30 seconds, P5DT8H for 5 days 8 hours.",
241+
converter = Converters.PositiveDurationTypeConverter.class)
241242
private Duration maxAge;
242243

243244
@CommandLine.Option(
@@ -556,6 +557,15 @@ static class InstanceSyncOptions {
556557
defaultValue = "true")
557558
private boolean tcpNoDelay;
558559

560+
@CommandLine.Option(
561+
names = {"--consumer-latency", "-L"},
562+
description =
563+
"consumer latency using Golang parseDuration syntax, "
564+
+ "e.g. 5 ms (d, w, y are not supported)",
565+
converter = Converters.GreaterThanOrEqualToZeroDurationTypeConverter.class,
566+
defaultValue = "0")
567+
private Duration consumerLatency;
568+
559569
private MetricsCollector metricsCollector;
560570
private PerformanceMetrics performanceMetrics;
561571
private List<Monitoring> monitorings;
@@ -1124,6 +1134,7 @@ public Integer call() throws Exception {
11241134
.builder();
11251135
}
11261136

1137+
Runnable latencyWorker = Utils.latencyWorker(this.consumerLatency);
11271138
consumerBuilder =
11281139
consumerBuilder.messageHandler(
11291140
(context, message) -> {
@@ -1137,12 +1148,12 @@ public Integer call() throws Exception {
11371148
// tool
11381149
}
11391150
metrics.offset(context.offset());
1151+
latencyWorker.run();
11401152
});
11411153

11421154
consumerBuilder = maybeConfigureForFiltering(consumerBuilder);
11431155

1144-
Consumer consumer = consumerBuilder.build();
1145-
return consumer;
1156+
return consumerBuilder.build();
11461157
})
11471158
.collect(Collectors.toList()));
11481159

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,4 +481,30 @@ static int filteringSubSetSize(int setSize) {
481481
return setSize - 3;
482482
}
483483
}
484+
485+
static Runnable latencyWorker(Duration latency) {
486+
if (latency.isZero()) {
487+
return () -> {};
488+
} else if (latency.toMillis() >= 1) {
489+
long latencyInMs = latency.toMillis();
490+
return () -> latencySleep(latencyInMs);
491+
} else {
492+
long latencyInNs = latency.toNanos();
493+
return () -> latencyBusyWait(latencyInNs);
494+
}
495+
}
496+
497+
private static void latencySleep(long delayInMs) {
498+
try {
499+
Thread.sleep(delayInMs);
500+
} catch (InterruptedException e) {
501+
Thread.currentThread().interrupt();
502+
}
503+
}
504+
505+
private static void latencyBusyWait(long delayInNs) {
506+
long start = System.nanoTime();
507+
while (System.nanoTime() - start < delayInNs)
508+
;
509+
}
484510
}

src/test/java/com/rabbitmq/stream/perf/ConvertersTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,24 @@ void sniServerNamesConverter() {
207207
.contains(new SNIHostName("dummy"));
208208
}
209209

210+
@ParameterizedTest
211+
@CsvSource({"50ms,50", "0,0", "1s,1000", "10m30s,630000", "PT1M30S,90000"})
212+
void durationTypeConverterOk(String value, long expectedInMs) {
213+
Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
214+
Duration duration = converter.convert(value);
215+
assertThat(duration).isNotNull();
216+
assertThat(duration).isEqualTo(Duration.ofMillis(expectedInMs));
217+
}
218+
219+
@ParameterizedTest
220+
@ValueSource(strings = {"1", "abc", "1.5"})
221+
void durationTypeConverterKo(String value) {
222+
Converters.DurationTypeConverter converter = new Converters.DurationTypeConverter();
223+
assertThatThrownBy(() -> converter.convert(value))
224+
.isInstanceOf(CommandLine.TypeConversionException.class)
225+
.hasMessageContaining("valid");
226+
}
227+
210228
private static Tag tag(String key, String value) {
211229
return Tag.of(key, value);
212230
}

0 commit comments

Comments
 (0)