From fac54ce7208478fdf68b31a113290669cbc6e4f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 14 Feb 2025 11:32:12 +0100 Subject: [PATCH 1/4] Bump Netty to 4.2.0.RC3 --- pom.xml | 2 +- src/main/java/com/rabbitmq/stream/impl/Client.java | 14 ++------------ .../rabbitmq/stream/impl/StreamEnvironment.java | 3 +-- src/main/java/com/rabbitmq/stream/impl/Utils.java | 7 +++++++ .../rabbitmq/stream/DefaultEnvironmentTest.java | 5 +++-- .../com/rabbitmq/stream/docs/EnvironmentUsage.java | 6 +++++- .../java/com/rabbitmq/stream/impl/AlarmsTest.java | 3 +-- .../stream/impl/MqttInteroperabilityTest.java | 3 +-- .../stream/impl/StompInteroperabilityTest.java | 3 +-- .../stream/impl/StreamEnvironmentTest.java | 6 ++++-- .../java/com/rabbitmq/stream/impl/TestUtils.java | 3 +-- 11 files changed, 27 insertions(+), 28 deletions(-) diff --git a/pom.xml b/pom.xml index 725f644930..5bdc1d9ee6 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.1.119.Final + 4.2.0.RC3 0.34.1 4.2.30 1.14.5 diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 8c4f53afec..7777ff158b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -59,17 +59,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufOutputStream; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DecoderException; @@ -248,7 +238,7 @@ public Client(ClientParameters parameters) { if (b.config().group() == null) { EventLoopGroup eventLoopGroup; if (parameters.eventLoopGroup == null) { - this.eventLoopGroup = new NioEventLoopGroup(); + this.eventLoopGroup = Utils.eventLoopGroup(); eventLoopGroup = this.eventLoopGroup; } else { this.eventLoopGroup = null; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 2bf4b233f1..57db09292e 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -38,7 +38,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import java.io.IOException; @@ -212,7 +211,7 @@ class StreamEnvironment implements Environment { this.addresses.size(), 1, "rabbitmq-stream-locator-connection-"); if (clientParametersPrototype.eventLoopGroup == null) { - this.eventLoopGroup = new NioEventLoopGroup(); + this.eventLoopGroup = Utils.eventLoopGroup(); this.clientParametersPrototype = clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup); } else { diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index b449763702..4ea934e911 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -20,6 +20,9 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.Client.ClientParameters; import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import java.net.UnknownHostException; import java.security.cert.X509Certificate; import java.time.Duration; @@ -408,6 +411,10 @@ static Function defaultConnectionNamingStrategy(St prefixes.get(clientConnectionType) + sequences.get(clientConnectionType).getAndIncrement(); } + static EventLoopGroup eventLoopGroup() { + return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); + } + /* class to help testing SAC on super streams */ diff --git a/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java index 4c45d6ae44..0441f5e6a9 100644 --- a/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java @@ -19,7 +19,8 @@ import com.rabbitmq.stream.impl.Client; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.nio.NioIoHandler; import java.util.UUID; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -31,7 +32,7 @@ public class DefaultEnvironmentTest { @BeforeAll static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); + eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); } @AfterAll diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java index 4f4c001982..bd7a9c6bc2 100644 --- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java @@ -19,7 +19,9 @@ import com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder; import io.micrometer.observation.ObservationRegistry; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; @@ -140,7 +142,9 @@ void deleteStream() { void nativeEpoll() { // tag::native-epoll[] - EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(); // <1> + EventLoopGroup epollEventLoopGroup = new MultiThreadIoEventLoopGroup( // <1> + EpollIoHandler.newFactory() // <1> + ); // <1> Environment environment = Environment.builder() .netty() // <2> .eventLoopGroup(epollEventLoopGroup) // <3> diff --git a/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java b/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java index 2ee819b058..78141e8130 100644 --- a/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java @@ -32,7 +32,6 @@ import com.rabbitmq.stream.Producer; import com.rabbitmq.stream.StreamException; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -56,7 +55,7 @@ public class AlarmsTest { @BeforeAll static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); + eventLoopGroup = Utils.eventLoopGroup(); } @AfterAll diff --git a/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java index 1cd14bdabd..9ee3e5b30c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/MqttInteroperabilityTest.java @@ -27,7 +27,6 @@ import com.rabbitmq.stream.OffsetSpecification; import com.rabbitmq.stream.amqp.UnsignedByte; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -57,7 +56,7 @@ public class MqttInteroperabilityTest { @BeforeAll static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); + eventLoopGroup = Utils.eventLoopGroup(); } @AfterAll diff --git a/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java b/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java index adb6153a27..4faf6cd848 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java @@ -22,7 +22,6 @@ import com.rabbitmq.stream.*; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -66,7 +65,7 @@ public class StompInteroperabilityTest { @BeforeAll static void initAll() { - eventLoopGroup = new NioEventLoopGroup(); + eventLoopGroup = Utils.eventLoopGroup(); } @AfterAll diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index e080275f1f..f076a78fde 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -51,7 +51,8 @@ import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.MultiThreadIoEventLoopGroup; +import io.netty.channel.epoll.EpollIoHandler; import io.netty.channel.epoll.EpollSocketChannel; import io.netty.handler.ssl.SslHandler; import java.net.ConnectException; @@ -723,7 +724,8 @@ void nettyInitializersAreCalled() { @EnabledIfSystemProperty(named = "os.arch", matches = "amd64") void nativeEpollWorksOnLinux() { int messageCount = 10_000; - EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(); + EventLoopGroup epollEventLoopGroup = + new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()); try { Set channels = ConcurrentHashMap.newKeySet(); try (Environment env = diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 417842be85..59d88cf6aa 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -42,7 +42,6 @@ import com.rabbitmq.stream.impl.Client.Response; import com.rabbitmq.stream.impl.Client.StreamMetadata; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.vavr.Tuple2; import java.io.IOException; import java.lang.annotation.Documented; @@ -627,7 +626,7 @@ static EventLoopGroup eventLoopGroup(ExtensionContext context) { @Override public void beforeAll(ExtensionContext context) { - store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); + store(context).put("nettyEventLoopGroup", Utils.eventLoopGroup()); } @Override From 0fa29503a5738efca11c2980d7cd2b6e4962b84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 14 Feb 2025 15:13:44 +0100 Subject: [PATCH 2/4] Deprecate tls#hostnameVerification Now set up with Netty's SslContextBuilder#endpointIdentificationAlgorithm(String). --- src/docs/asciidoc/api.adoc | 14 ++++---------- .../com/rabbitmq/stream/EnvironmentBuilder.java | 10 ++++++++-- .../java/com/rabbitmq/stream/impl/Client.java | 15 --------------- .../rabbitmq/stream/impl/StreamEnvironment.java | 7 ++++--- .../stream/impl/StreamEnvironmentBuilder.java | 3 +++ .../java/com/rabbitmq/stream/impl/TlsTest.java | 12 ++++++------ 6 files changed, 25 insertions(+), 36 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index d490a5d12b..f4c5d0be9a 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -88,10 +88,9 @@ TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI. The default TLS port is 5551. Use the `EnvironmentBuilder#tls` method to configure TLS. -The most important setting is a `io.netty.handler.ssl.SslContext` instance, -which is created and configured with the -`io.netty.handler.ssl.SslContext#forClient` method. Note hostname verification -is enabled by default. +The most important setting is a `io.netty.handler.ssl.SslContext` instance, which is created and configured with the +`io.netty.handler.ssl.SslContext#forClient` method. +Note hostname verification is enabled by default. The following snippet shows a common configuration, whereby the client is instructed to trust servers with certificates @@ -242,15 +241,10 @@ Used as a prefix for connection names. |Configuration helper for TLS. |TLS is enabled if a `rabbitmq-stream+tls` URI is provided. -|`tls#hostnameVerification` -|Enable or disable hostname verification. -|Enabled by default. - |`tls#sslContext` |Set the `io.netty.handler.ssl.SslContext` used for the TLS connection. Use `io.netty.handler.ssl.SslContextBuilder#forClient` to configure it. -The server certificate chain and the client private key are the typical -elements that need to be configured. +The server certificate chain, the client private key, and hostname verification are the usual elements that need to be configured. |The JDK trust manager and no client private key. |`tls#trustEverything` diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 12dd0643ed..33fac4dcbc 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -442,7 +442,10 @@ interface TlsConfiguration { *

Hostname verification is enabled by default. * * @return the TLS configuration helper + * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link + * #sslContext(SslContext)} */ + @Deprecated(forRemoval = true) TlsConfiguration hostnameVerification(); /** @@ -450,9 +453,12 @@ interface TlsConfiguration { * *

Hostname verification is enabled by default. * - * @param hostnameVerification + * @param hostnameVerification whether to enable hostname verification or not * @return the TLS configuration helper + * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link + * #sslContext(SslContext)} */ + @Deprecated(forRemoval = true) TlsConfiguration hostnameVerification(boolean hostnameVerification); /** @@ -460,7 +466,7 @@ interface TlsConfiguration { * *

Use {@link SslContextBuilder#forClient()} to configure and create an instance. * - * @param sslContext + * @param sslContext the SSL context * @return the TLS configuration helper */ TlsConfiguration sslContext(SslContext sslContext); diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 7777ff158b..8679844939 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -96,9 +96,7 @@ import java.util.function.Consumer; import java.util.function.Supplier; import java.util.function.ToLongFunction; -import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLHandshakeException; -import javax.net.ssl.SSLParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -283,13 +281,6 @@ public void initChannel(SocketChannel ch) { SslHandler sslHandler = parameters.sslContext.newHandler(ch.alloc(), parameters.host, parameters.port); - if (parameters.tlsHostnameVerification) { - SSLEngine sslEngine = sslHandler.engine(); - SSLParameters sslParameters = sslEngine.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - sslEngine.setSSLParameters(sslParameters); - } - ch.pipeline().addFirst("ssl", sslHandler); } channelCustomizer.accept(ch); @@ -2397,7 +2388,6 @@ public static class ClientParameters { private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON; private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON; private SslContext sslContext; - private boolean tlsHostnameVerification = true; private ByteBufAllocator byteBufAllocator; private Duration rpcTimeout; private Consumer channelCustomizer = noOpConsumer(); @@ -2554,11 +2544,6 @@ public ClientParameters sslContext(SslContext sslContext) { return this; } - public ClientParameters tlsHostnameVerification(boolean tlsHostnameVerification) { - this.tlsHostnameVerification = tlsHostnameVerification; - return this; - } - public ClientParameters compressionCodecFactory( CompressionCodecFactory compressionCodecFactory) { this.compressionCodecFactory = compressionCodecFactory; diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 57db09292e..55957434cf 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -129,12 +129,13 @@ class StreamEnvironment implements Environment { try { SslContext sslContext = tlsConfiguration.sslContext() == null - ? SslContextBuilder.forClient().build() + ? SslContextBuilder.forClient() + .endpointIdentificationAlgorithm( + tlsConfiguration.hostnameVerificationEnabled() ? "HTTPS" : null) + .build() : tlsConfiguration.sslContext(); clientParametersPrototype.sslContext(sslContext); - clientParametersPrototype.tlsHostnameVerification( - tlsConfiguration.hostnameVerificationEnabled()); } catch (SSLException e) { throw new StreamException("Error while creating Netty SSL context", e); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index 555200dee0..e23dc8d845 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -373,12 +373,14 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) { } @Override + @SuppressWarnings("removal") public TlsConfiguration hostnameVerification() { this.hostnameVerification = true; return this; } @Override + @SuppressWarnings("removal") public TlsConfiguration hostnameVerification(boolean hostnameVerification) { this.hostnameVerification = hostnameVerification; return this; @@ -400,6 +402,7 @@ public TlsConfiguration trustEverything() { this.sslContext( SslContextBuilder.forClient() .trustManager(Utils.TRUST_EVERYTHING_TRUST_MANAGER) + .endpointIdentificationAlgorithm("NONE") .build()); } catch (SSLException e) { throw new StreamException("Error while creating Netty SSL context", e); diff --git a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java index 3959789cda..249034555b 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TlsTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/TlsTest.java @@ -295,12 +295,12 @@ void hostnameVerificationShouldFailWhenSettingHostToLoopbackInterface() throws E @Test void shouldConnectWhenSettingHostToLoopbackInterfaceAndDisablingHostnameVerification() throws Exception { - SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build(); - cf.get( - new ClientParameters() - .sslContext(context) - .host("127.0.0.1") - .tlsHostnameVerification(false)); + SslContext context = + SslContextBuilder.forClient() + .endpointIdentificationAlgorithm(null) + .trustManager(caCertificate()) + .build(); + cf.get(new ClientParameters().sslContext(context).host("127.0.0.1")); } @Test From 9b8e9926337f6067cffacb4eea9c861a27838843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 10 Mar 2025 17:35:17 +0100 Subject: [PATCH 3/4] Bump Netty to 4.2.0.RC4 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5bdc1d9ee6..2aa094eff7 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.RC3 + 4.2.0.RC4 0.34.1 4.2.30 1.14.5 From ee52e939bf176a6f23f662713f8f4ec50084c75c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 7 Apr 2025 09:15:47 +0200 Subject: [PATCH 4/4] Bump Netty to 4.2.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2aa094eff7..5c465ff2ea 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.RC4 + 4.2.0.Final 0.34.1 4.2.30 1.14.5