Skip to content

Commit 6fe24af

Browse files
committed
Add test with native epoll
References #269
1 parent 8d9da34 commit 6fe24af

File tree

3 files changed

+84
-17
lines changed

3 files changed

+84
-17
lines changed

pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,14 @@
286286
<scope>test</scope>
287287
</dependency>
288288

289+
<dependency>
290+
<groupId>io.netty</groupId>
291+
<artifactId>netty-transport-native-epoll</artifactId>
292+
<version>${netty.version}</version>
293+
<classifier>linux-x86_64</classifier>
294+
<scope>test</scope>
295+
</dependency>
296+
289297
<dependency>
290298
<groupId>org.openjdk.jmh</groupId>
291299
<artifactId>jmh-core</artifactId>

src/main/java/com/rabbitmq/stream/impl/Client.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -248,24 +248,38 @@ public Client(ClientParameters parameters) {
248248
}
249249
});
250250

251-
EventLoopGroup eventLoopGroup;
252-
if (parameters.eventLoopGroup == null) {
253-
this.eventLoopGroup = new NioEventLoopGroup();
254-
eventLoopGroup = this.eventLoopGroup;
251+
Consumer<Bootstrap> bootstrapCustomizer =
252+
parameters.bootstrapCustomizer == null ? noOpConsumer() : parameters.bootstrapCustomizer;
253+
254+
Bootstrap b = new Bootstrap();
255+
bootstrapCustomizer.accept(b);
256+
if (b.config().group() == null) {
257+
EventLoopGroup eventLoopGroup;
258+
if (parameters.eventLoopGroup == null) {
259+
this.eventLoopGroup = new NioEventLoopGroup();
260+
eventLoopGroup = this.eventLoopGroup;
261+
} else {
262+
this.eventLoopGroup = null;
263+
eventLoopGroup = parameters.eventLoopGroup;
264+
}
265+
b.group(eventLoopGroup);
255266
} else {
256267
this.eventLoopGroup = null;
257-
eventLoopGroup = parameters.eventLoopGroup;
268+
}
269+
if (b.config().channelFactory() == null) {
270+
b.channel(NioSocketChannel.class);
271+
}
272+
if (!b.config().options().containsKey(ChannelOption.SO_KEEPALIVE)) {
273+
b.option(ChannelOption.SO_KEEPALIVE, true);
274+
}
275+
if (!b.config().options().containsKey(ChannelOption.ALLOCATOR)) {
276+
b.option(
277+
ChannelOption.ALLOCATOR,
278+
parameters.byteBufAllocator == null
279+
? ByteBufAllocator.DEFAULT
280+
: parameters.byteBufAllocator);
258281
}
259282

260-
Bootstrap b = new Bootstrap();
261-
b.group(eventLoopGroup);
262-
b.channel(NioSocketChannel.class);
263-
b.option(ChannelOption.SO_KEEPALIVE, true);
264-
b.option(
265-
ChannelOption.ALLOCATOR,
266-
parameters.byteBufAllocator == null
267-
? ByteBufAllocator.DEFAULT
268-
: parameters.byteBufAllocator);
269283
Consumer<Channel> channelCustomizer =
270284
parameters.channelCustomizer == null ? noOpConsumer() : parameters.channelCustomizer;
271285
b.handler(
@@ -310,9 +324,6 @@ public void write(
310324
channelCustomizer.accept(ch);
311325
}
312326
});
313-
Consumer<Bootstrap> bootstrapCustomizer =
314-
parameters.bootstrapCustomizer == null ? noOpConsumer() : parameters.bootstrapCustomizer;
315-
bootstrapCustomizer.accept(b);
316327

317328
ChannelFuture f;
318329
String clientConnectionName =

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled;
5454
import io.netty.channel.Channel;
5555
import io.netty.channel.EventLoopGroup;
56+
import io.netty.channel.epoll.EpollEventLoopGroup;
57+
import io.netty.channel.epoll.EpollSocketChannel;
5658
import io.netty.channel.nio.NioEventLoopGroup;
5759
import io.netty.handler.ssl.SslHandler;
5860
import java.net.ConnectException;
@@ -64,13 +66,16 @@
6466
import java.util.List;
6567
import java.util.Map;
6668
import java.util.Random;
69+
import java.util.Set;
6770
import java.util.UUID;
71+
import java.util.concurrent.ConcurrentHashMap;
6872
import java.util.concurrent.CopyOnWriteArrayList;
6973
import java.util.concurrent.CountDownLatch;
7074
import java.util.concurrent.atomic.AtomicBoolean;
7175
import java.util.concurrent.atomic.AtomicLong;
7276
import java.util.function.Supplier;
7377
import java.util.stream.Collectors;
78+
import java.util.stream.IntStream;
7479
import javax.net.ssl.SNIHostName;
7580
import javax.net.ssl.SSLParameters;
7681
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
@@ -79,6 +84,9 @@
7984
import org.junit.jupiter.api.BeforeEach;
8085
import org.junit.jupiter.api.Test;
8186
import org.junit.jupiter.api.TestInfo;
87+
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
88+
import org.junit.jupiter.api.condition.EnabledOnOs;
89+
import org.junit.jupiter.api.condition.OS;
8290
import org.junit.jupiter.api.extension.ExtendWith;
8391
import org.junit.jupiter.params.ParameterizedTest;
8492
import org.junit.jupiter.params.provider.ValueSource;
@@ -615,4 +623,44 @@ void nettyInitializersAreCalled() {
615623
assertThat(bootstrapCalled).isTrue();
616624
assertThat(channelCalled).isTrue();
617625
}
626+
627+
@Test
628+
@EnabledOnOs(OS.LINUX)
629+
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
630+
void nativeEpollWorksOnLinux() {
631+
int messageCount = 10_000;
632+
EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup();
633+
try {
634+
Set<Channel> channels = ConcurrentHashMap.newKeySet();
635+
try (Environment env =
636+
environmentBuilder
637+
.netty()
638+
.eventLoopGroup(epollEventLoopGroup)
639+
.bootstrapCustomizer(b -> b.channel(EpollSocketChannel.class))
640+
.channelCustomizer(ch -> channels.add(ch))
641+
.environmentBuilder()
642+
.build()) {
643+
Producer producer = env.producerBuilder().stream(this.stream).build();
644+
ConfirmationHandler handler = confirmationStatus -> {};
645+
IntStream.range(0, messageCount)
646+
.forEach(
647+
i ->
648+
producer.send(
649+
producer
650+
.messageBuilder()
651+
.addData("hello".getBytes(StandardCharsets.UTF_8))
652+
.build(),
653+
handler));
654+
CountDownLatch latch = new CountDownLatch(messageCount);
655+
env.consumerBuilder().stream(this.stream)
656+
.offset(OffsetSpecification.first())
657+
.messageHandler((context, message) -> latch.countDown())
658+
.build();
659+
assertThat(latchAssert(latch)).completes();
660+
}
661+
assertThat(channels).isNotEmpty().allMatch(ch -> ch instanceof EpollSocketChannel);
662+
} finally {
663+
epollEventLoopGroup.shutdownGracefully(0, 0, SECONDS);
664+
}
665+
}
618666
}

0 commit comments

Comments
 (0)