Skip to content

Commit 55fa2ab

Browse files
committed
Add --native-epoll option to performance tool
Fixes #270
1 parent 6fe24af commit 55fa2ab

File tree

3 files changed

+45
-1
lines changed

3 files changed

+45
-1
lines changed

pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,13 @@
639639
<version>${amqp-client.version}</version>
640640
</dependency>
641641

642+
<dependency>
643+
<groupId>io.netty</groupId>
644+
<artifactId>netty-transport-native-epoll</artifactId>
645+
<version>${netty.version}</version>
646+
<classifier>linux-x86_64</classifier>
647+
</dependency>
648+
642649
</dependencies>
643650
<build>
644651
<finalName>${finalName}</finalName>

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,13 @@
5454
import io.micrometer.core.instrument.Tag;
5555
import io.micrometer.core.instrument.Tags;
5656
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
57+
import io.netty.bootstrap.Bootstrap;
5758
import io.netty.buffer.ByteBufAllocator;
5859
import io.netty.buffer.ByteBufAllocatorMetric;
5960
import io.netty.buffer.ByteBufAllocatorMetricProvider;
6061
import io.netty.channel.EventLoopGroup;
62+
import io.netty.channel.epoll.EpollEventLoopGroup;
63+
import io.netty.channel.epoll.EpollSocketChannel;
6164
import io.netty.channel.nio.NioEventLoopGroup;
6265
import io.netty.handler.ssl.SslContextBuilder;
6366
import io.netty.handler.ssl.SslHandler;
@@ -441,6 +444,12 @@ public void setMaxSegmentSize(ByteCapacity in) {
441444
converter = Utils.ByteCapacityTypeConverter.class)
442445
private ByteCapacity requestedMaxFrameSize;
443446

447+
@CommandLine.Option(
448+
names = {"--native-epoll", "-ne"},
449+
description = "use Netty's native epoll transport (Linux x86-64 only)",
450+
defaultValue = "false")
451+
private boolean nativeEpoll;
452+
444453
private MetricsCollector metricsCollector;
445454
private PerformanceMetrics performanceMetrics;
446455
private List<Monitoring> monitorings;
@@ -664,7 +673,14 @@ public Integer call() throws Exception {
664673
}
665674
}
666675

667-
this.eventLoopGroup = new NioEventLoopGroup();
676+
java.util.function.Consumer<Bootstrap> bootstrapCustomizer;
677+
if (this.nativeEpoll) {
678+
this.eventLoopGroup = new EpollEventLoopGroup();
679+
bootstrapCustomizer = b -> b.channel(EpollSocketChannel.class);
680+
} else {
681+
this.eventLoopGroup = new NioEventLoopGroup();
682+
bootstrapCustomizer = b -> {};
683+
}
668684

669685
EnvironmentBuilder environmentBuilder =
670686
Environment.builder()
@@ -676,6 +692,7 @@ public Integer call() throws Exception {
676692
.netty()
677693
.byteBufAllocator(byteBufAllocator)
678694
.eventLoopGroup(eventLoopGroup)
695+
.bootstrapCustomizer(bootstrapCustomizer)
679696
.environmentBuilder()
680697
.codec(codec)
681698
.maxProducersByConnection(this.producersByConnection)

src/test/java/com/rabbitmq/stream/perf/StreamPerfTestTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
import org.junit.jupiter.api.BeforeEach;
6161
import org.junit.jupiter.api.Test;
6262
import org.junit.jupiter.api.TestInfo;
63+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
64+
import org.junit.jupiter.api.condition.EnabledOnOs;
65+
import org.junit.jupiter.api.condition.OS;
6366
import org.junit.jupiter.api.extension.ExtendWith;
6467

6568
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@@ -496,6 +499,18 @@ void singleActiveConsumersOnSuperStream() throws Exception {
496499
.isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
497500
}
498501

502+
@Test
503+
@EnabledOnOs(OS.LINUX)
504+
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
505+
void nativeEpollWorksOnLinux() throws Exception {
506+
Future<?> run = run(builder().nativeEpoll());
507+
waitUntilStreamExists(s);
508+
waitOneSecond();
509+
run.cancel(true);
510+
waitRunEnds();
511+
assertThat(streamExists(s)).isTrue();
512+
}
513+
499514
private static <T> Consumer<T> wrap(CallableConsumer<T> action) {
500515
return t -> {
501516
try {
@@ -708,6 +723,11 @@ ArgumentsBuilder time(int time) {
708723
return this;
709724
}
710725

726+
ArgumentsBuilder nativeEpoll() {
727+
arguments.put("native-epoll", "");
728+
return this;
729+
}
730+
711731
String build() {
712732
return this.arguments.entrySet().stream()
713733
.map(e -> "--" + e.getKey() + (e.getValue().isEmpty() ? "" : (" " + e.getValue())))

0 commit comments

Comments
 (0)