diff --git a/pom.xml b/pom.xml
index 725f644930..5c465ff2ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,7 @@
true
1.7.36
1.2.13
- 4.1.119.Final
+ 4.2.0.Final
0.34.1
4.2.30
1.14.5
diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index d490a5d12b..f4c5d0be9a 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -88,10 +88,9 @@ TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI.
The default TLS port is 5551.
Use the `EnvironmentBuilder#tls` method to configure TLS.
-The most important setting is a `io.netty.handler.ssl.SslContext` instance,
-which is created and configured with the
-`io.netty.handler.ssl.SslContext#forClient` method. Note hostname verification
-is enabled by default.
+The most important setting is a `io.netty.handler.ssl.SslContext` instance, which is created and configured with the
+`io.netty.handler.ssl.SslContext#forClient` method.
+Note hostname verification is enabled by default.
The following snippet shows a common configuration, whereby
the client is instructed to trust servers with certificates
@@ -242,15 +241,10 @@ Used as a prefix for connection names.
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
-|`tls#hostnameVerification`
-|Enable or disable hostname verification.
-|Enabled by default.
-
|`tls#sslContext`
|Set the `io.netty.handler.ssl.SslContext` used for the TLS connection.
Use `io.netty.handler.ssl.SslContextBuilder#forClient` to configure it.
-The server certificate chain and the client private key are the typical
-elements that need to be configured.
+The server certificate chain, the client private key, and hostname verification are the usual elements that need to be configured.
|The JDK trust manager and no client private key.
|`tls#trustEverything`
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index 12dd0643ed..33fac4dcbc 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -442,7 +442,10 @@ interface TlsConfiguration {
*
Hostname verification is enabled by default.
*
* @return the TLS configuration helper
+ * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
+ * #sslContext(SslContext)}
*/
+ @Deprecated(forRemoval = true)
TlsConfiguration hostnameVerification();
/**
@@ -450,9 +453,12 @@ interface TlsConfiguration {
*
*
Hostname verification is enabled by default.
*
- * @param hostnameVerification
+ * @param hostnameVerification whether to enable hostname verification or not
* @return the TLS configuration helper
+ * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
+ * #sslContext(SslContext)}
*/
+ @Deprecated(forRemoval = true)
TlsConfiguration hostnameVerification(boolean hostnameVerification);
/**
@@ -460,7 +466,7 @@ interface TlsConfiguration {
*
*
Use {@link SslContextBuilder#forClient()} to configure and create an instance.
*
- * @param sslContext
+ * @param sslContext the SSL context
* @return the TLS configuration helper
*/
TlsConfiguration sslContext(SslContext sslContext);
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index 8c4f53afec..8679844939 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -59,17 +59,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.ConnectTimeoutException;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DecoderException;
@@ -106,9 +96,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
-import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
-import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -248,7 +236,7 @@ public Client(ClientParameters parameters) {
if (b.config().group() == null) {
EventLoopGroup eventLoopGroup;
if (parameters.eventLoopGroup == null) {
- this.eventLoopGroup = new NioEventLoopGroup();
+ this.eventLoopGroup = Utils.eventLoopGroup();
eventLoopGroup = this.eventLoopGroup;
} else {
this.eventLoopGroup = null;
@@ -293,13 +281,6 @@ public void initChannel(SocketChannel ch) {
SslHandler sslHandler =
parameters.sslContext.newHandler(ch.alloc(), parameters.host, parameters.port);
- if (parameters.tlsHostnameVerification) {
- SSLEngine sslEngine = sslHandler.engine();
- SSLParameters sslParameters = sslEngine.getSSLParameters();
- sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
- sslEngine.setSSLParameters(sslParameters);
- }
-
ch.pipeline().addFirst("ssl", sslHandler);
}
channelCustomizer.accept(ch);
@@ -2407,7 +2388,6 @@ public static class ClientParameters {
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
private SslContext sslContext;
- private boolean tlsHostnameVerification = true;
private ByteBufAllocator byteBufAllocator;
private Duration rpcTimeout;
private Consumer channelCustomizer = noOpConsumer();
@@ -2564,11 +2544,6 @@ public ClientParameters sslContext(SslContext sslContext) {
return this;
}
- public ClientParameters tlsHostnameVerification(boolean tlsHostnameVerification) {
- this.tlsHostnameVerification = tlsHostnameVerification;
- return this;
- }
-
public ClientParameters compressionCodecFactory(
CompressionCodecFactory compressionCodecFactory) {
this.compressionCodecFactory = compressionCodecFactory;
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index 2bf4b233f1..55957434cf 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -38,7 +38,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.IOException;
@@ -130,12 +129,13 @@ class StreamEnvironment implements Environment {
try {
SslContext sslContext =
tlsConfiguration.sslContext() == null
- ? SslContextBuilder.forClient().build()
+ ? SslContextBuilder.forClient()
+ .endpointIdentificationAlgorithm(
+ tlsConfiguration.hostnameVerificationEnabled() ? "HTTPS" : null)
+ .build()
: tlsConfiguration.sslContext();
clientParametersPrototype.sslContext(sslContext);
- clientParametersPrototype.tlsHostnameVerification(
- tlsConfiguration.hostnameVerificationEnabled());
} catch (SSLException e) {
throw new StreamException("Error while creating Netty SSL context", e);
@@ -212,7 +212,7 @@ class StreamEnvironment implements Environment {
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
if (clientParametersPrototype.eventLoopGroup == null) {
- this.eventLoopGroup = new NioEventLoopGroup();
+ this.eventLoopGroup = Utils.eventLoopGroup();
this.clientParametersPrototype =
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
} else {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index 555200dee0..e23dc8d845 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -373,12 +373,14 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) {
}
@Override
+ @SuppressWarnings("removal")
public TlsConfiguration hostnameVerification() {
this.hostnameVerification = true;
return this;
}
@Override
+ @SuppressWarnings("removal")
public TlsConfiguration hostnameVerification(boolean hostnameVerification) {
this.hostnameVerification = hostnameVerification;
return this;
@@ -400,6 +402,7 @@ public TlsConfiguration trustEverything() {
this.sslContext(
SslContextBuilder.forClient()
.trustManager(Utils.TRUST_EVERYTHING_TRUST_MANAGER)
+ .endpointIdentificationAlgorithm("NONE")
.build());
} catch (SSLException e) {
throw new StreamException("Error while creating Netty SSL context", e);
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index b449763702..4ea934e911 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -20,6 +20,9 @@
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
import java.time.Duration;
@@ -408,6 +411,10 @@ static Function defaultConnectionNamingStrategy(St
prefixes.get(clientConnectionType) + sequences.get(clientConnectionType).getAndIncrement();
}
+ static EventLoopGroup eventLoopGroup() {
+ return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
+ }
+
/*
class to help testing SAC on super streams
*/
diff --git a/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java
index 4c45d6ae44..0441f5e6a9 100644
--- a/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java
@@ -19,7 +19,8 @@
import com.rabbitmq.stream.impl.Client;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.nio.NioIoHandler;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -31,7 +32,7 @@ public class DefaultEnvironmentTest {
@BeforeAll
static void initAll() {
- eventLoopGroup = new NioEventLoopGroup();
+ eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
}
@AfterAll
diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
index 4f4c001982..bd7a9c6bc2 100644
--- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
+++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
@@ -19,7 +19,9 @@
import com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder;
import io.micrometer.observation.ObservationRegistry;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
@@ -140,7 +142,9 @@ void deleteStream() {
void nativeEpoll() {
// tag::native-epoll[]
- EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(); // <1>
+ EventLoopGroup epollEventLoopGroup = new MultiThreadIoEventLoopGroup( // <1>
+ EpollIoHandler.newFactory() // <1>
+ ); // <1>
Environment environment = Environment.builder()
.netty() // <2>
.eventLoopGroup(epollEventLoopGroup) // <3>
diff --git a/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java b/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java
index 2ee819b058..78141e8130 100644
--- a/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java
@@ -32,7 +32,6 @@
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,7 +55,7 @@ public class AlarmsTest {
@BeforeAll
static void initAll() {
- eventLoopGroup = new NioEventLoopGroup();
+ eventLoopGroup = Utils.eventLoopGroup();
}
@AfterAll
diff --git a/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java
index 1cd14bdabd..9ee3e5b30c 100644
--- a/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java
@@ -27,7 +27,6 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.amqp.UnsignedByte;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -57,7 +56,7 @@ public class MqttInteroperabilityTest {
@BeforeAll
static void initAll() {
- eventLoopGroup = new NioEventLoopGroup();
+ eventLoopGroup = Utils.eventLoopGroup();
}
@AfterAll
diff --git a/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java
index adb6153a27..4faf6cd848 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java
@@ -22,7 +22,6 @@
import com.rabbitmq.stream.*;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -66,7 +65,7 @@ public class StompInteroperabilityTest {
@BeforeAll
static void initAll() {
- eventLoopGroup = new NioEventLoopGroup();
+ eventLoopGroup = Utils.eventLoopGroup();
}
@AfterAll
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
index e080275f1f..f076a78fde 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
@@ -51,7 +51,8 @@
import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.net.ConnectException;
@@ -723,7 +724,8 @@ void nettyInitializersAreCalled() {
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
void nativeEpollWorksOnLinux() {
int messageCount = 10_000;
- EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup();
+ EventLoopGroup epollEventLoopGroup =
+ new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory());
try {
Set channels = ConcurrentHashMap.newKeySet();
try (Environment env =
diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
index 417842be85..59d88cf6aa 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
@@ -42,7 +42,6 @@
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.Client.StreamMetadata;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.vavr.Tuple2;
import java.io.IOException;
import java.lang.annotation.Documented;
@@ -627,7 +626,7 @@ static EventLoopGroup eventLoopGroup(ExtensionContext context) {
@Override
public void beforeAll(ExtensionContext context) {
- store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
+ store(context).put("nettyEventLoopGroup", Utils.eventLoopGroup());
}
@Override
diff --git a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java
index 3959789cda..249034555b 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java
@@ -295,12 +295,12 @@ void hostnameVerificationShouldFailWhenSettingHostToLoopbackInterface() throws E
@Test
void shouldConnectWhenSettingHostToLoopbackInterfaceAndDisablingHostnameVerification()
throws Exception {
- SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build();
- cf.get(
- new ClientParameters()
- .sslContext(context)
- .host("127.0.0.1")
- .tlsHostnameVerification(false));
+ SslContext context =
+ SslContextBuilder.forClient()
+ .endpointIdentificationAlgorithm(null)
+ .trustManager(caCertificate())
+ .build();
+ cf.get(new ClientParameters().sslContext(context).host("127.0.0.1"));
}
@Test