Skip to content

Bump Netty to 4.2.0 #709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<spotless.check.skip>true</spotless.check.skip>
<slf4j.version>1.7.36</slf4j.version>
<logback.version>1.2.13</logback.version>
<netty.version>4.1.119.Final</netty.version>
<netty.version>4.2.0.Final</netty.version>
<proton-j.version>0.34.1</proton-j.version>
<metrics.version>4.2.30</metrics.version>
<micrometer.version>1.14.5</micrometer.version>
Expand Down
14 changes: 4 additions & 10 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ TLS can be enabled by using the `rabbitmq-stream+tls` scheme in the URI.
The default TLS port is 5551.

Use the `EnvironmentBuilder#tls` method to configure TLS.
The most important setting is a `io.netty.handler.ssl.SslContext` instance,
which is created and configured with the
`io.netty.handler.ssl.SslContext#forClient` method. Note hostname verification
is enabled by default.
The most important setting is a `io.netty.handler.ssl.SslContext` instance, which is created and configured with the
`io.netty.handler.ssl.SslContext#forClient` method.
Note hostname verification is enabled by default.

The following snippet shows a common configuration, whereby
the client is instructed to trust servers with certificates
Expand Down Expand Up @@ -242,15 +241,10 @@ Used as a prefix for connection names.
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.

|`tls#hostnameVerification`
|Enable or disable hostname verification.
|Enabled by default.

|`tls#sslContext`
|Set the `io.netty.handler.ssl.SslContext` used for the TLS connection.
Use `io.netty.handler.ssl.SslContextBuilder#forClient` to configure it.
The server certificate chain and the client private key are the typical
elements that need to be configured.
The server certificate chain, the client private key, and hostname verification are the usual elements that need to be configured.
|The JDK trust manager and no client private key.

|`tls#trustEverything`
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,25 +442,31 @@ interface TlsConfiguration {
* <p>Hostname verification is enabled by default.
*
* @return the TLS configuration helper
* @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
* #sslContext(SslContext)}
*/
@Deprecated(forRemoval = true)
TlsConfiguration hostnameVerification();

/**
* Enable or disable hostname verification.
*
* <p>Hostname verification is enabled by default.
*
* @param hostnameVerification
* @param hostnameVerification whether to enable hostname verification or not
* @return the TLS configuration helper
* @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
* #sslContext(SslContext)}
*/
@Deprecated(forRemoval = true)
TlsConfiguration hostnameVerification(boolean hostnameVerification);

/**
* Netty {@link SslContext} for TLS connections.
*
* <p>Use {@link SslContextBuilder#forClient()} to configure and create an instance.
*
* @param sslContext
* @param sslContext the SSL context
* @return the TLS configuration helper
*/
TlsConfiguration sslContext(SslContext sslContext);
Expand Down
29 changes: 2 additions & 27 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DecoderException;
Expand Down Expand Up @@ -106,9 +96,7 @@
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -248,7 +236,7 @@ public Client(ClientParameters parameters) {
if (b.config().group() == null) {
EventLoopGroup eventLoopGroup;
if (parameters.eventLoopGroup == null) {
this.eventLoopGroup = new NioEventLoopGroup();
this.eventLoopGroup = Utils.eventLoopGroup();
eventLoopGroup = this.eventLoopGroup;
} else {
this.eventLoopGroup = null;
Expand Down Expand Up @@ -293,13 +281,6 @@ public void initChannel(SocketChannel ch) {
SslHandler sslHandler =
parameters.sslContext.newHandler(ch.alloc(), parameters.host, parameters.port);

if (parameters.tlsHostnameVerification) {
SSLEngine sslEngine = sslHandler.engine();
SSLParameters sslParameters = sslEngine.getSSLParameters();
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(sslParameters);
}

ch.pipeline().addFirst("ssl", sslHandler);
}
channelCustomizer.accept(ch);
Expand Down Expand Up @@ -2407,7 +2388,6 @@ public static class ClientParameters {
private ChunkChecksum chunkChecksum = JdkChunkChecksum.CRC32_SINGLETON;
private MetricsCollector metricsCollector = NoOpMetricsCollector.SINGLETON;
private SslContext sslContext;
private boolean tlsHostnameVerification = true;
private ByteBufAllocator byteBufAllocator;
private Duration rpcTimeout;
private Consumer<Channel> channelCustomizer = noOpConsumer();
Expand Down Expand Up @@ -2564,11 +2544,6 @@ public ClientParameters sslContext(SslContext sslContext) {
return this;
}

public ClientParameters tlsHostnameVerification(boolean tlsHostnameVerification) {
this.tlsHostnameVerification = tlsHostnameVerification;
return this;
}

public ClientParameters compressionCodecFactory(
CompressionCodecFactory compressionCodecFactory) {
this.compressionCodecFactory = compressionCodecFactory;
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import java.io.IOException;
Expand Down Expand Up @@ -130,12 +129,13 @@ class StreamEnvironment implements Environment {
try {
SslContext sslContext =
tlsConfiguration.sslContext() == null
? SslContextBuilder.forClient().build()
? SslContextBuilder.forClient()
.endpointIdentificationAlgorithm(
tlsConfiguration.hostnameVerificationEnabled() ? "HTTPS" : null)
.build()
: tlsConfiguration.sslContext();

clientParametersPrototype.sslContext(sslContext);
clientParametersPrototype.tlsHostnameVerification(
tlsConfiguration.hostnameVerificationEnabled());

} catch (SSLException e) {
throw new StreamException("Error while creating Netty SSL context", e);
Expand Down Expand Up @@ -212,7 +212,7 @@ class StreamEnvironment implements Environment {
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");

if (clientParametersPrototype.eventLoopGroup == null) {
this.eventLoopGroup = new NioEventLoopGroup();
this.eventLoopGroup = Utils.eventLoopGroup();
this.clientParametersPrototype =
clientParametersPrototype.duplicate().eventLoopGroup(this.eventLoopGroup);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,14 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) {
}

@Override
@SuppressWarnings("removal")
public TlsConfiguration hostnameVerification() {
this.hostnameVerification = true;
return this;
}

@Override
@SuppressWarnings("removal")
public TlsConfiguration hostnameVerification(boolean hostnameVerification) {
this.hostnameVerification = hostnameVerification;
return this;
Expand All @@ -400,6 +402,7 @@ public TlsConfiguration trustEverything() {
this.sslContext(
SslContextBuilder.forClient()
.trustManager(Utils.TRUST_EVERYTHING_TRUST_MANAGER)
.endpointIdentificationAlgorithm("NONE")
.build());
} catch (SSLException e) {
throw new StreamException("Error while creating Netty SSL context", e);
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import java.net.UnknownHostException;
import java.security.cert.X509Certificate;
import java.time.Duration;
Expand Down Expand Up @@ -408,6 +411,10 @@ static Function<ClientConnectionType, String> defaultConnectionNamingStrategy(St
prefixes.get(clientConnectionType) + sequences.get(clientConnectionType).getAndIncrement();
}

static EventLoopGroup eventLoopGroup() {
return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
}

/*
class to help testing SAC on super streams
*/
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/com/rabbitmq/stream/DefaultEnvironmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import com.rabbitmq.stream.impl.Client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.nio.NioIoHandler;
import java.util.UUID;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -31,7 +32,7 @@ public class DefaultEnvironmentTest {

@BeforeAll
static void initAll() {
eventLoopGroup = new NioEventLoopGroup();
eventLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
}

@AfterAll
Expand Down
6 changes: 5 additions & 1 deletion src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder;
import io.micrometer.observation.ObservationRegistry;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
Expand Down Expand Up @@ -140,7 +142,9 @@ void deleteStream() {

void nativeEpoll() {
// tag::native-epoll[]
EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup(); // <1>
EventLoopGroup epollEventLoopGroup = new MultiThreadIoEventLoopGroup( // <1>
EpollIoHandler.newFactory() // <1>
); // <1>
Environment environment = Environment.builder()
.netty() // <2>
.eventLoopGroup(epollEventLoopGroup) // <3>
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -56,7 +55,7 @@ public class AlarmsTest {

@BeforeAll
static void initAll() {
eventLoopGroup = new NioEventLoopGroup();
eventLoopGroup = Utils.eventLoopGroup();
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.amqp.UnsignedByte;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -57,7 +56,7 @@ public class MqttInteroperabilityTest {

@BeforeAll
static void initAll() {
eventLoopGroup = new NioEventLoopGroup();
eventLoopGroup = Utils.eventLoopGroup();
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.rabbitmq.stream.*;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -66,7 +65,7 @@ public class StompInteroperabilityTest {

@BeforeAll
static void initAll() {
eventLoopGroup = new NioEventLoopGroup();
eventLoopGroup = Utils.eventLoopGroup();
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
import com.rabbitmq.stream.impl.TestUtils.DisabledIfTlsNotEnabled;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
import io.netty.channel.epoll.EpollIoHandler;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.handler.ssl.SslHandler;
import java.net.ConnectException;
Expand Down Expand Up @@ -723,7 +724,8 @@ void nettyInitializersAreCalled() {
@EnabledIfSystemProperty(named = "os.arch", matches = "amd64")
void nativeEpollWorksOnLinux() {
int messageCount = 10_000;
EventLoopGroup epollEventLoopGroup = new EpollEventLoopGroup();
EventLoopGroup epollEventLoopGroup =
new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory());
try {
Set<Channel> channels = ConcurrentHashMap.newKeySet();
try (Environment env =
Expand Down
3 changes: 1 addition & 2 deletions src/test/java/com/rabbitmq/stream/impl/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.rabbitmq.stream.impl.Client.Response;
import com.rabbitmq.stream.impl.Client.StreamMetadata;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.vavr.Tuple2;
import java.io.IOException;
import java.lang.annotation.Documented;
Expand Down Expand Up @@ -627,7 +626,7 @@ static EventLoopGroup eventLoopGroup(ExtensionContext context) {

@Override
public void beforeAll(ExtensionContext context) {
store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
store(context).put("nettyEventLoopGroup", Utils.eventLoopGroup());
}

@Override
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/com/rabbitmq/stream/impl/TlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ void hostnameVerificationShouldFailWhenSettingHostToLoopbackInterface() throws E
@Test
void shouldConnectWhenSettingHostToLoopbackInterfaceAndDisablingHostnameVerification()
throws Exception {
SslContext context = SslContextBuilder.forClient().trustManager(caCertificate()).build();
cf.get(
new ClientParameters()
.sslContext(context)
.host("127.0.0.1")
.tlsHostnameVerification(false));
SslContext context =
SslContextBuilder.forClient()
.endpointIdentificationAlgorithm(null)
.trustManager(caCertificate())
.build();
cf.get(new ClientParameters().sslContext(context).host("127.0.0.1"));
}

@Test
Expand Down
Loading