-
Notifications
You must be signed in to change notification settings - Fork 184
Handle early disconnects before SSL handshake #596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d05f93e
d0bbd9d
6de187d
d40226f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import io.netty.handler.codec.LengthFieldBasedFrameDecoder; | ||
import io.netty.handler.logging.LogLevel; | ||
import io.netty.handler.logging.LoggingHandler; | ||
import io.netty.util.AttributeKey; | ||
import io.netty.util.ReferenceCountUtil; | ||
import io.netty.util.internal.logging.InternalLogger; | ||
import io.netty.util.internal.logging.InternalLoggerFactory; | ||
|
@@ -101,6 +102,8 @@ public final class ReactorNettyClient implements Client { | |
|
||
private static final Supplier<PostgresConnectionClosedException> EXPECTED = () -> new PostgresConnectionClosedException("Connection closed"); | ||
|
||
private static final AttributeKey<Mono<Void>> SSL_HANDSHAKE_KEY = AttributeKey.valueOf("ssl-handshake"); | ||
|
||
private final ByteBufAllocator byteBufAllocator; | ||
|
||
private final ConnectionSettings settings; | ||
|
@@ -144,7 +147,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { | |
Assert.requireNonNull(connection, "Connection must not be null"); | ||
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null"); | ||
|
||
connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); | ||
connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); | ||
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0)); | ||
this.connection = connection; | ||
this.byteBufAllocator = connection.outbound().alloc(); | ||
|
@@ -392,43 +395,43 @@ public static Mono<ReactorNettyClient> connect(SocketAddress socketAddress, Conn | |
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeoutMs()); | ||
} | ||
|
||
return tcpClient.connect().flatMap(it -> { | ||
|
||
ChannelPipeline pipeline = it.channel().pipeline(); | ||
return tcpClient.doOnChannelInit((observer, channel, remoteAddress) -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||
ChannelPipeline pipeline = channel.pipeline(); | ||
|
||
InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class); | ||
if (logger.isTraceEnabled()) { | ||
pipeline.addFirst(LoggingHandler.class.getSimpleName(), | ||
new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE)); | ||
} | ||
Comment on lines
398
to
402
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trace logging handler also added on channel init, so it logs all lifecycle events. |
||
|
||
return registerSslHandler(settings.getSslConfig(), it).thenReturn(new ReactorNettyClient(it, settings)); | ||
}); | ||
registerSslHandler(settings.getSslConfig(), channel); | ||
}).connect().flatMap(it -> | ||
getSslHandshake(it.channel()).thenReturn(new ReactorNettyClient(it, settings)) | ||
); | ||
} | ||
|
||
private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Connection it) { | ||
|
||
private static void registerSslHandler(SSLConfig sslConfig, Channel channel) { | ||
try { | ||
if (sslConfig.getSslMode().startSsl()) { | ||
|
||
return Mono.defer(() -> { | ||
AbstractPostgresSSLHandlerAdapter sslAdapter; | ||
if (sslConfig.getSslMode() == SSLMode.TUNNEL) { | ||
sslAdapter = new SSLTunnelHandlerAdapter(it.outbound().alloc(), sslConfig); | ||
} else { | ||
sslAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig); | ||
} | ||
|
||
it.addHandlerFirst(sslAdapter); | ||
return sslAdapter.getHandshake(); | ||
AbstractPostgresSSLHandlerAdapter sslAdapter; | ||
if (sslConfig.getSslMode() == SSLMode.TUNNEL) { | ||
sslAdapter = new SSLTunnelHandlerAdapter(channel.alloc(), sslConfig); | ||
} else { | ||
sslAdapter = new SSLSessionHandlerAdapter(channel.alloc(), sslConfig); | ||
} | ||
|
||
}).subscribeOn(Schedulers.boundedElastic()); | ||
channel.pipeline().addFirst(sslAdapter); | ||
channel.attr(SSL_HANDSHAKE_KEY).set(sslAdapter.getHandshake()); | ||
} | ||
} catch (Throwable e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
return Mono.empty(); | ||
private static Mono<Void> getSslHandshake(Channel channel) { | ||
Mono<Void> sslHandshake = channel.attr(SSL_HANDSHAKE_KEY).getAndSet(null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of an attribute we could walk the handler pipeline and obtain the handshake mono from the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As mentioned in original comment, since the handshake mono is under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an indication that the idea of removing the SSL handler isn't ideal. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, not removing the handlers is the alternative. So if leaving them in the pipeline, as noop handlers after the negotiation, is preferred, then I'll update to that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
return (sslHandshake == null) ? Mono.empty() : sslHandshake; | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,8 +40,17 @@ final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter { | |
} | ||
|
||
@Override | ||
public void handlerAdded(ChannelHandlerContext ctx) { | ||
public void channelActive(ChannelHandlerContext ctx) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we require the same changes in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush); | ||
ctx.fireChannelActive(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can resort to calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, wasn't sure what would be preferred. Updated in d0bbd9d. |
||
} | ||
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext ctx) { | ||
// If we receive channel inactive before removing this handler, then the inbound has closed early. | ||
PostgresqlSslException e = new PostgresqlSslException("Connection closed during SSL negotiation"); | ||
completeHandshakeExceptionally(e); | ||
ctx.fireChannelInactive(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated. |
||
} | ||
|
||
@Override | ||
|
@@ -54,7 +63,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
processSslEnabled(ctx, buf); | ||
break; | ||
case 'N': | ||
processSslDisabled(); | ||
processSslDisabled(ctx); | ||
break; | ||
default: | ||
buf.release(); | ||
|
@@ -65,13 +74,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { | |
} | ||
} | ||
|
||
private void processSslDisabled() { | ||
private void processSslDisabled(ChannelHandlerContext ctx) { | ||
if (this.sslConfig.getSslMode().requireSsl()) { | ||
PostgresqlSslException e = | ||
new PostgresqlSslException("Server support for SSL connection is disabled, but client was configured with SSL mode " + this.sslConfig.getSslMode()); | ||
completeHandshakeExceptionally(e); | ||
} else { | ||
completeHandshake(); | ||
ctx.channel().pipeline().remove(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noticed this path as well, which could be reached if the server is |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
/* | ||
* Copyright 2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.r2dbc.postgresql.client; | ||
|
||
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; | ||
import io.r2dbc.postgresql.PostgresqlConnectionFactory; | ||
import org.junit.jupiter.api.Test; | ||
import reactor.netty.DisposableChannel; | ||
import reactor.netty.DisposableServer; | ||
import reactor.netty.tcp.TcpServer; | ||
import reactor.test.StepVerifier; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class DowntimeIntegrationTests { | ||
|
||
@Test | ||
void failSslHandshakeIfInboundClosed() { | ||
// Simulate server downtime, where connections are accepted and then closed immediately | ||
DisposableServer server = | ||
TcpServer.create() | ||
.doOnConnection(DisposableChannel::dispose) | ||
.bindNow(); | ||
|
||
PostgresqlConnectionFactory connectionFactory = | ||
new PostgresqlConnectionFactory( | ||
PostgresqlConnectionConfiguration.builder() | ||
.host(server.host()) | ||
.port(server.port()) | ||
.username("test") | ||
.sslMode(SSLMode.REQUIRE) | ||
.build()); | ||
|
||
connectionFactory.create() | ||
.as(StepVerifier::create) | ||
.verifyErrorSatisfies(error -> | ||
assertThat(error) | ||
.isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class) | ||
.hasMessage("Connection closed during SSL negotiation")); | ||
|
||
server.disposeNow(); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeps the same handler order. The SSL adapter is already added first.