-
Notifications
You must be signed in to change notification settings - Fork 184
Handle early disconnects before SSL handshake #596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d05f93e
d0bbd9d
6de187d
d40226f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -144,7 +144,7 @@ private ReactorNettyClient(Connection connection, ConnectionSettings settings) { | |
Assert.requireNonNull(connection, "Connection must not be null"); | ||
this.settings = Assert.requireNonNull(settings, "ConnectionSettings must not be null"); | ||
|
||
connection.addHandlerFirst(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); | ||
connection.addHandlerLast(new EnsureSubscribersCompleteChannelHandler(this.requestSink)); | ||
connection.addHandlerLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE - 5, 1, 4, -4, 0)); | ||
this.connection = connection; | ||
this.byteBufAllocator = connection.outbound().alloc(); | ||
|
@@ -392,43 +392,42 @@ public static Mono<ReactorNettyClient> connect(SocketAddress socketAddress, Conn | |
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeoutMs()); | ||
} | ||
|
||
return tcpClient.connect().flatMap(it -> { | ||
|
||
ChannelPipeline pipeline = it.channel().pipeline(); | ||
return tcpClient.doOnChannelInit((observer, channel, remoteAddress) -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using |
||
ChannelPipeline pipeline = channel.pipeline(); | ||
|
||
InternalLogger logger = InternalLoggerFactory.getInstance(ReactorNettyClient.class); | ||
if (logger.isTraceEnabled()) { | ||
pipeline.addFirst(LoggingHandler.class.getSimpleName(), | ||
new LoggingHandler(ReactorNettyClient.class, LogLevel.TRACE)); | ||
} | ||
Comment on lines
398
to
402
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trace logging handler also added on channel init, so it logs all lifecycle events. |
||
|
||
return registerSslHandler(settings.getSslConfig(), it).thenReturn(new ReactorNettyClient(it, settings)); | ||
}); | ||
registerSslHandler(settings.getSslConfig(), channel); | ||
}).connect().flatMap(it -> | ||
getSslHandshake(it.channel()).thenReturn(new ReactorNettyClient(it, settings)) | ||
); | ||
} | ||
|
||
private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Connection it) { | ||
|
||
private static void registerSslHandler(SSLConfig sslConfig, Channel channel) { | ||
try { | ||
if (sslConfig.getSslMode().startSsl()) { | ||
|
||
return Mono.defer(() -> { | ||
AbstractPostgresSSLHandlerAdapter sslAdapter; | ||
if (sslConfig.getSslMode() == SSLMode.TUNNEL) { | ||
sslAdapter = new SSLTunnelHandlerAdapter(it.outbound().alloc(), sslConfig); | ||
} else { | ||
sslAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig); | ||
} | ||
|
||
it.addHandlerFirst(sslAdapter); | ||
return sslAdapter.getHandshake(); | ||
AbstractPostgresSSLHandlerAdapter sslAdapter; | ||
if (sslConfig.getSslMode() == SSLMode.TUNNEL) { | ||
sslAdapter = new SSLTunnelHandlerAdapter(channel.alloc(), sslConfig); | ||
} else { | ||
sslAdapter = new SSLSessionHandlerAdapter(channel.alloc(), sslConfig); | ||
} | ||
|
||
}).subscribeOn(Schedulers.boundedElastic()); | ||
channel.pipeline().addFirst(sslAdapter); | ||
} | ||
} catch (Throwable e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
return Mono.empty(); | ||
private static Mono<Void> getSslHandshake(Channel channel) { | ||
AbstractPostgresSSLHandlerAdapter sslAdapter = channel.pipeline().get(AbstractPostgresSSLHandlerAdapter.class); | ||
return (sslAdapter != null) ? sslAdapter.getHandshake() : Mono.empty(); | ||
} | ||
|
||
@Override | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright 2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.r2dbc.postgresql.client; | ||
|
||
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration; | ||
import io.r2dbc.postgresql.PostgresqlConnectionFactory; | ||
import io.r2dbc.postgresql.api.PostgresqlException; | ||
import org.junit.jupiter.api.Test; | ||
import reactor.netty.DisposableChannel; | ||
import reactor.netty.DisposableServer; | ||
import reactor.netty.tcp.TcpServer; | ||
import reactor.test.StepVerifier; | ||
|
||
import java.nio.channels.ClosedChannelException; | ||
import java.util.function.Consumer; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
public class DowntimeIntegrationTests { | ||
|
||
// Simulate server downtime, where connections are accepted and then closed immediately | ||
static DisposableServer newServer() { | ||
return TcpServer.create() | ||
.doOnConnection(DisposableChannel::dispose) | ||
.bindNow(); | ||
} | ||
|
||
static PostgresqlConnectionFactory newConnectionFactory(DisposableServer server, SSLMode sslMode) { | ||
return new PostgresqlConnectionFactory( | ||
PostgresqlConnectionConfiguration.builder() | ||
.host(server.host()) | ||
.port(server.port()) | ||
.username("test") | ||
.sslMode(sslMode) | ||
.build()); | ||
} | ||
|
||
static void verifyError(SSLMode sslMode, Consumer<Throwable> assertions) { | ||
DisposableServer server = newServer(); | ||
PostgresqlConnectionFactory connectionFactory = newConnectionFactory(server, sslMode); | ||
connectionFactory.create().as(StepVerifier::create).verifyErrorSatisfies(assertions); | ||
server.disposeNow(); | ||
} | ||
|
||
@Test | ||
void failSslHandshakeIfInboundClosed() { | ||
verifyError(SSLMode.REQUIRE, error -> | ||
assertThat(error) | ||
.isInstanceOf(AbstractPostgresSSLHandlerAdapter.PostgresqlSslException.class) | ||
.hasMessage("Connection closed during SSL negotiation")); | ||
} | ||
|
||
@Test | ||
void failSslTunnelIfInboundClosed() { | ||
verifyError(SSLMode.TUNNEL, error -> { | ||
assertThat(error) | ||
.isInstanceOf(PostgresqlException.class) | ||
.cause() | ||
.isInstanceOf(ClosedChannelException.class); | ||
|
||
assertThat(error.getCause().getSuppressed().length).isOne(); | ||
|
||
assertThat(error.getCause().getSuppressed()[0]) | ||
.hasMessage("Connection closed while SSL/TLS handshake was in progress"); | ||
}); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeps the same handler order. The SSL adapter is already added first.