Skip to content

Handle early disconnects before SSL handshake #596

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -101,6 +102,8 @@ public final class ReactorNettyClient implements Client {

private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed");

private static final AttributeKey<Mono<Void>> SSL_HANDSHAKE_KEY = AttributeKey.valueOf("ssl-handshake");

private final ByteBufAllocator byteBufAllocator;

private final ConnectionSettings settings;
Expand Down Expand Up @@ -144,7 +147,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) {
Assert.requireNonNull(connection, "Connection must not be null");
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null");

connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink));
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0));
Comment on lines -147 to 148
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeps the same handler order. The SSL adapter is already added first.

this.connection = connection;
this.byteBufAllocator = connection.outbound().alloc();
Expand Down Expand Up @@ -392,43 +395,43 @@ public static Mono<ReactorNettyClient> connect(SocketAddress socketAddress, Conn
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeoutMs());
}

return tcpClient.connect().flatMap(it -> {

ChannelPipeline pipeline = it.channel().pipeline();
return tcpClient.doOnChannelInit((observer, channel, remoteAddress) -> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using doOnChannelInit does make sense to avoid the gap of having a closed channel.

ChannelPipeline pipeline = channel.pipeline();

InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class);
if (logger.isTraceEnabled()) {
pipeline.addFirst(LoggingHandler.class.getSimpleName(),
new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE));
}
Comment on lines 398 to 402
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trace logging handler also added on channel init, so it logs all lifecycle events.


return registerSslHandler(settings.getSslConfig(), it).thenReturn(new ReactorNettyClient(it, settings));
});
registerSslHandler(settings.getSslConfig(), channel);
}).connect().flatMap(it ->
getSslHandshake(it.channel()).thenReturn(new ReactorNettyClient(it, settings))
);
}

private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Connection it) {

private static void registerSslHandler(SSLConfig sslConfig, Channel channel) {
try {
if (sslConfig.getSslMode().startSsl()) {

return Mono.defer(() -> {
AbstractPostgresSSLHandlerAdapter sslAdapter;
if (sslConfig.getSslMode() == SSLMode.TUNNEL) {
sslAdapter = new SSLTunnelHandlerAdapter(it.outbound().alloc(), sslConfig);
} else {
sslAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig);
}

it.addHandlerFirst(sslAdapter);
return sslAdapter.getHandshake();
AbstractPostgresSSLHandlerAdapter sslAdapter;
if (sslConfig.getSslMode() == SSLMode.TUNNEL) {
sslAdapter = new SSLTunnelHandlerAdapter(channel.alloc(), sslConfig);
} else {
sslAdapter = new SSLSessionHandlerAdapter(channel.alloc(), sslConfig);
}

}).subscribeOn(Schedulers.boundedElastic());
channel.pipeline().addFirst(sslAdapter);
channel.attr(SSL_HANDSHAKE_KEY).set(sslAdapter.getHandshake());
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

return Mono.empty();
private static Mono<Void> getSslHandshake(Channel channel) {
Mono<Void> sslHandshake = channel.attr(SSL_HANDSHAKE_KEY).getAndSet(null);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of an attribute we could walk the handler pipeline and obtain the handshake mono from the AbstractPostgresSSLHandlerAdapter object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in original comment, since the handshake mono is under connect.flatMap and the SSL adapter swaps itself for the underlying SslHandler, the SSL adapter can already be removed from the pipeline at this point. Definitely for SSLTunnelHandlerAdapter, which swaps on handler added. Assuming it's possible for SSLSessionHandlerAdapter as well, based on timing. Or am I missing something? Maybe if the SSL tunnel adapter doesn't swap for the SSL handler until after connection, and it's clear that the adapters will still be in the pipeline.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an indication that the idea of removing the SSL handler isn't ideal. The SSLSessionHandlerAdapter performs one-time-only ops by sending the SSL request upon connect and reading the result afterwards. If we introduce state to the handler to remember that we've sent the request and that we've consumed its result, then we could skip the respective activity in channelActive and channelRead and keep the handler within the pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, not removing the handlers is the alternative. So if leaving them in the pipeline, as noop handlers after the negotiation, is preferred, then I'll update to that.

Copy link
Contributor Author

@pvlugter pvlugter Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 0ba6b07 d40226f

return (sslHandshake == null) ? Mono.empty() : sslHandshake;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter {
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) {
public void channelActive(ChannelHandlerContext ctx) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we require the same changes in SSLTunnelHandlerAdapter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SSLTunnelHandlerAdapter immediately swaps itself for the underlying SslHandler, which already handles the disconnects, so shouldn't need any changes. Added a similar integration test for tunnel mode: 6de187d

Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush);
ctx.fireChannelActive();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can resort to calling super.channelActive().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, wasn't sure what would be preferred. Updated in d0bbd9d.

}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
// If we receive channel inactive before removing this handler, then the inbound has closed early.
PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation");
completeHandshakeExceptionally(e);
ctx.fireChannelInactive();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's call super.channelInactive()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}

@Override
Expand All @@ -54,7 +63,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
processSslEnabled(ctx, buf);
break;
case 'N':
processSslDisabled();
processSslDisabled(ctx);
break;
default:
buf.release();
Expand All @@ -65,13 +74,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}

private void processSslDisabled() {
private void processSslDisabled(ChannelHandlerContext ctx) {
if (this.sslConfig.getSslMode().requireSsl()) {
PostgresqlSslException e =
new PostgresqlSslException("Server support for SSL connection is disabled, but client was configured with SSL mode " + this.sslConfig.getSslMode());
completeHandshakeExceptionally(e);
} else {
completeHandshake();
ctx.channel().pipeline().remove(this);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed this path as well, which could be reached if the server is ssl=off and the client is sslmode=prefer. Probably not tested currently, but the adapter would still be in the pipeline and fail subsequent reads.

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.r2dbc.postgresql.client;

import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import org.junit.jupiter.api.Test;
import reactor.netty.DisposableChannel;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.test.StepVerifier;

import static org.assertj.core.api.Assertions.assertThat;

public class DowntimeIntegrationTests {

@Test
void failSslHandshakeIfInboundClosed() {
// Simulate server downtime, where connections are accepted and then closed immediately
DisposableServer server =
TcpServer.create()
.doOnConnection(DisposableChannel::dispose)
.bindNow();

PostgresqlConnectionFactory connectionFactory =
new PostgresqlConnectionFactory(
PostgresqlConnectionConfiguration.builder()
.host(server.host())
.port(server.port())
.username("test")
.sslMode(SSLMode.REQUIRE)
.build());

connectionFactory.create()
.as(StepVerifier::create)
.verifyErrorSatisfies(error ->
assertThat(error)
.isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class)
.hasMessage("Connection closed during SSL negotiation"));

server.disposeNow();
}

}