From d05f93ec0616889679b99d0be2084aa0cc9f52c5 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Tue, 13 Jun 2023 15:49:10 +1200 Subject: [PATCH 1/4] Handle early disconnects before SSL handshake --- .../postgresql/client/ReactorNettyClient.java | 43 +++++++------- .../client/SSLSessionHandlerAdapter.java | 16 ++++- .../client/DowntimeIntegrationTests.java | 58 +++++++++++++++++++ 3 files changed, 94 insertions(+), 23 deletions(-) create mode 100644 src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index c7abfb40a..fdd7d96d4 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -25,6 +25,7 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -101,6 +102,8 @@ public final class ReactorNettyClient implements Client { private static final Supplier EXPECTED = () -> new PostgresConnectionClosedException("Connection closed"); + private static final AttributeKey> SSL_HANDSHAKE_KEY = AttributeKey.valueOf("ssl-handshake"); + private final ByteBufAllocator byteBufAllocator; private final ConnectionSettings settings; @@ -144,7 +147,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { Assert.requireNonNull(connection, "Connection must not be null"); this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null"); - connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); + connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0)); this.connection = connection; this.byteBufAllocator = connection.outbound().alloc(); @@ -392,9 +395,8 @@ public static Mono connect(SocketAddress socketAddress, Conn tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeoutMs()); } - return tcpClient.connect().flatMap(it -> { - - ChannelPipeline pipeline = it.channel().pipeline(); + return tcpClient.doOnChannelInit((observer, channel, remoteAddress) -> { + ChannelPipeline pipeline = channel.pipeline(); InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class); if (logger.isTraceEnabled()) { @@ -402,33 +404,34 @@ public static Mono connect(SocketAddress socketAddress, Conn new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE)); } - return registerSslHandler(settings.getSslConfig(), it).thenReturn(new ReactorNettyClient(it, settings)); - }); + registerSslHandler(settings.getSslConfig(), channel); + }).connect().flatMap(it -> + getSslHandshake(it.channel()).thenReturn(new ReactorNettyClient(it, settings)) + ); } - private static Mono registerSslHandler(SSLConfig sslConfig, Connection it) { - + private static void registerSslHandler(SSLConfig sslConfig, Channel channel) { try { if (sslConfig.getSslMode().startSsl()) { - return Mono.defer(() -> { - AbstractPostgresSSLHandlerAdapter sslAdapter; - if (sslConfig.getSslMode() == SSLMode.TUNNEL) { - sslAdapter = new SSLTunnelHandlerAdapter(it.outbound().alloc(), sslConfig); - } else { - sslAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig); - } - - it.addHandlerFirst(sslAdapter); - return sslAdapter.getHandshake(); + AbstractPostgresSSLHandlerAdapter sslAdapter; + if (sslConfig.getSslMode() == SSLMode.TUNNEL) { + sslAdapter = new SSLTunnelHandlerAdapter(channel.alloc(), sslConfig); + } else { + sslAdapter = new SSLSessionHandlerAdapter(channel.alloc(), sslConfig); + } - }).subscribeOn(Schedulers.boundedElastic()); + channel.pipeline().addFirst(sslAdapter); + channel.attr(SSL_HANDSHAKE_KEY).set(sslAdapter.getHandshake()); } } catch (Throwable e) { throw new RuntimeException(e); } + } - return Mono.empty(); + private static Mono getSslHandshake(Channel channel) { + Mono sslHandshake = channel.attr(SSL_HANDSHAKE_KEY).getAndSet(null); + return (sslHandshake == null) ? Mono.empty() : sslHandshake; } @Override diff --git a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java index da2422262..0ec43926d 100644 --- a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java +++ b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java @@ -40,8 +40,17 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter { } @Override - public void handlerAdded(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) { Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush); + ctx.fireChannelActive(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + // If we receive channel inactive before removing this handler, then the inbound has closed early. + PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation"); + completeHandshakeExceptionally(e); + ctx.fireChannelInactive(); } @Override @@ -54,7 +63,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { processSslEnabled(ctx, buf); break; case 'N': - processSslDisabled(); + processSslDisabled(ctx); break; default: buf.release(); @@ -65,13 +74,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } - private void processSslDisabled() { + private void processSslDisabled(ChannelHandlerContext ctx) { if (this.sslConfig.getSslMode().requireSsl()) { PostgresqlSslException e = new PostgresqlSslException("Server support for SSL connection is disabled, but client was configured with SSL mode " + this.sslConfig.getSslMode()); completeHandshakeExceptionally(e); } else { completeHandshake(); + ctx.channel().pipeline().remove(this); } } diff --git a/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java new file mode 100644 index 000000000..564b9176c --- /dev/null +++ b/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java @@ -0,0 +1,58 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.r2dbc.postgresql.client; + +import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; +import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import org.junit.jupiter.api.Test; +import reactor.netty.DisposableChannel; +import reactor.netty.DisposableServer; +import reactor.netty.tcp.TcpServer; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DowntimeIntegrationTests { + + @Test + void failSslHandshakeIfInboundClosed() { + // Simulate server downtime, where connections are accepted and then closed immediately + DisposableServer server = + TcpServer.create() + .doOnConnection(DisposableChannel::dispose) + .bindNow(); + + PostgresqlConnectionFactory connectionFactory = + new PostgresqlConnectionFactory( + PostgresqlConnectionConfiguration.builder() + .host(server.host()) + .port(server.port()) + .username("test") + .sslMode(SSLMode.REQUIRE) + .build()); + + connectionFactory.create() + .as(StepVerifier::create) + .verifyErrorSatisfies(error -> + assertThat(error) + .isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class) + .hasMessage("Connection closed during SSL negotiation")); + + server.disposeNow(); + } + +} From d0bbd9d44165e3b6b89fce5b5aedf5d65901ce79 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Sat, 17 Jun 2023 13:20:38 +1200 Subject: [PATCH 2/4] update: super calls rather than ctx fire directly --- .../r2dbc/postgresql/client/SSLSessionHandlerAdapter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java index 0ec43926d..34ec84c2c 100644 --- a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java +++ b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java @@ -40,17 +40,17 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter { } @Override - public void channelActive(ChannelHandlerContext ctx) { + public void channelActive(ChannelHandlerContext ctx) throws Exception { Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush); - ctx.fireChannelActive(); + super.channelActive(ctx); } @Override - public void channelInactive(ChannelHandlerContext ctx) { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { // If we receive channel inactive before removing this handler, then the inbound has closed early. PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation"); completeHandshakeExceptionally(e); - ctx.fireChannelInactive(); + super.channelInactive(ctx); } @Override From 6de187d066f6fc41b51e4fa8e0072901a2276793 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Sat, 17 Jun 2023 13:21:03 +1200 Subject: [PATCH 3/4] update: add downtime integration test for ssl tunnel mode --- .../client/DowntimeIntegrationTests.java | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java b/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java index 564b9176c..d00b52a1d 100644 --- a/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java +++ b/src/test/java/io/r2dbc/postgresql/client/DowntimeIntegrationTests.java @@ -18,41 +18,65 @@ import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; import io.r2dbc.postgresql.PostgresqlConnectionFactory; +import io.r2dbc.postgresql.api.PostgresqlException; import org.junit.jupiter.api.Test; import reactor.netty.DisposableChannel; import reactor.netty.DisposableServer; import reactor.netty.tcp.TcpServer; import reactor.test.StepVerifier; +import java.nio.channels.ClosedChannelException; +import java.util.function.Consumer; + import static org.assertj.core.api.Assertions.assertThat; public class DowntimeIntegrationTests { + // Simulate server downtime, where connections are accepted and then closed immediately + static DisposableServer newServer() { + return TcpServer.create() + .doOnConnection(DisposableChannel::dispose) + .bindNow(); + } + + static PostgresqlConnectionFactory newConnectionFactory(DisposableServer server, SSLMode sslMode) { + return new PostgresqlConnectionFactory( + PostgresqlConnectionConfiguration.builder() + .host(server.host()) + .port(server.port()) + .username("test") + .sslMode(sslMode) + .build()); + } + + static void verifyError(SSLMode sslMode, Consumer assertions) { + DisposableServer server = newServer(); + PostgresqlConnectionFactory connectionFactory = newConnectionFactory(server, sslMode); + connectionFactory.create().as(StepVerifier::create).verifyErrorSatisfies(assertions); + server.disposeNow(); + } + @Test void failSslHandshakeIfInboundClosed() { - // Simulate server downtime, where connections are accepted and then closed immediately - DisposableServer server = - TcpServer.create() - .doOnConnection(DisposableChannel::dispose) - .bindNow(); - - PostgresqlConnectionFactory connectionFactory = - new PostgresqlConnectionFactory( - PostgresqlConnectionConfiguration.builder() - .host(server.host()) - .port(server.port()) - .username("test") - .sslMode(SSLMode.REQUIRE) - .build()); - - connectionFactory.create() - .as(StepVerifier::create) - .verifyErrorSatisfies(error -> - assertThat(error) - .isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class) - .hasMessage("Connection closed during SSL negotiation")); + verifyError(SSLMode.REQUIRE, error -> + assertThat(error) + .isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class) + .hasMessage("Connection closed during SSL negotiation")); + } - server.disposeNow(); + @Test + void failSslTunnelIfInboundClosed() { + verifyError(SSLMode.TUNNEL, error -> { + assertThat(error) + .isInstanceOf(PostgresqlException.class) + .cause() + .isInstanceOf(ClosedChannelException.class); + + assertThat(error.getCause().getSuppressed().length).isOne(); + + assertThat(error.getCause().getSuppressed()[0]) + .hasMessage("Connection closed while SSL/TLS handshake was in progress"); + }); } } From d40226fc41a5afaef1224e44f48826d227c8ecf8 Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Mon, 26 Jun 2023 17:38:13 +1200 Subject: [PATCH 4/4] update: keep SSL handler adapters in the pipeline --- .../postgresql/client/ReactorNettyClient.java | 8 +-- .../client/SSLSessionHandlerAdapter.java | 57 +++++++++++-------- .../client/SSLTunnelHandlerAdapter.java | 4 +- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java index fdd7d96d4..cb40ede43 100644 --- a/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java +++ b/src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java @@ -25,7 +25,6 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -102,8 +101,6 @@ public final class ReactorNettyClient implements Client { private static final Supplier EXPECTED = () -> new PostgresConnectionClosedException("Connection closed"); - private static final AttributeKey> SSL_HANDSHAKE_KEY = AttributeKey.valueOf("ssl-handshake"); - private final ByteBufAllocator byteBufAllocator; private final ConnectionSettings settings; @@ -422,7 +419,6 @@ private static void registerSslHandler(SSLConfig sslConfig, Channel channel) { } channel.pipeline().addFirst(sslAdapter); - channel.attr(SSL_HANDSHAKE_KEY).set(sslAdapter.getHandshake()); } } catch (Throwable e) { throw new RuntimeException(e); @@ -430,8 +426,8 @@ private static void registerSslHandler(SSLConfig sslConfig, Channel channel) { } private static Mono getSslHandshake(Channel channel) { - Mono sslHandshake = channel.attr(SSL_HANDSHAKE_KEY).getAndSet(null); - return (sslHandshake == null) ? Mono.empty() : sslHandshake; + AbstractPostgresSSLHandlerAdapter sslAdapter = channel.pipeline().get(AbstractPostgresSSLHandlerAdapter.class); + return (sslAdapter != null) ? sslAdapter.getHandshake() : Mono.empty(); } @Override diff --git a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java index 34ec84c2c..4e64df8fa 100644 --- a/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java +++ b/src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java @@ -33,6 +33,8 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter { private final SSLConfig sslConfig; + private boolean negotiating = true; + SSLSessionHandlerAdapter(ByteBufAllocator alloc, SSLConfig sslConfig) { super(alloc, sslConfig); this.alloc = alloc; @@ -41,47 +43,54 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush); + if (negotiating) { + Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush); + } super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // If we receive channel inactive before removing this handler, then the inbound has closed early. - PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation"); - completeHandshakeExceptionally(e); + if (negotiating) { + // If we receive channel inactive before negotiated, then the inbound has closed early. + PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation"); + completeHandshakeExceptionally(e); + } super.channelInactive(ctx); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - ByteBuf buf = (ByteBuf) msg; - char response = (char) buf.readByte(); - try { - switch (response) { - case 'S': - processSslEnabled(ctx, buf); - break; - case 'N': - processSslDisabled(ctx); - break; - default: - buf.release(); - throw new IllegalStateException("Unknown SSLResponse from server: '" + response + "'"); + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (negotiating) { + ByteBuf buf = (ByteBuf) msg; + char response = (char) buf.readByte(); + try { + switch (response) { + case 'S': + processSslEnabled(ctx, buf); + break; + case 'N': + processSslDisabled(); + break; + default: + throw new IllegalStateException("Unknown SSLResponse from server: '" + response + "'"); + } + } finally { + buf.release(); + negotiating = false; } - } finally { - buf.release(); + } else { + super.channelRead(ctx, msg); } } - private void processSslDisabled(ChannelHandlerContext ctx) { + private void processSslDisabled() { if (this.sslConfig.getSslMode().requireSsl()) { PostgresqlSslException e = new PostgresqlSslException("Server support for SSL connection is disabled, but client was configured with SSL mode " + this.sslConfig.getSslMode()); completeHandshakeExceptionally(e); } else { completeHandshake(); - ctx.channel().pipeline().remove(this); } } @@ -92,9 +101,7 @@ private void processSslEnabled(ChannelHandlerContext ctx, ByteBuf msg) { completeHandshakeExceptionally(e); return; } - ctx.channel().pipeline() - .addFirst(this.getSslHandler()) - .remove(this); + ctx.channel().pipeline().addFirst(this.getSslHandler()); ctx.fireChannelRead(msg.retain()); } diff --git a/src/main/java/io/r2dbc/postgresql/client/SSLTunnelHandlerAdapter.java b/src/main/java/io/r2dbc/postgresql/client/SSLTunnelHandlerAdapter.java index 3301c7630..059d66b35 100644 --- a/src/main/java/io/r2dbc/postgresql/client/SSLTunnelHandlerAdapter.java +++ b/src/main/java/io/r2dbc/postgresql/client/SSLTunnelHandlerAdapter.java @@ -40,9 +40,7 @@ public void handlerAdded(ChannelHandlerContext ctx) { completeHandshakeExceptionally(e); return; } - ctx.channel().pipeline() - .addFirst(this.getSslHandler()) - .remove(this); + ctx.channel().pipeline().addFirst(this.getSslHandler()); } }