Skip to content

Commit d878858

Browse files
committed
Add support for SSL tunnels
[resolves #295]
1 parent b2866fe commit d878858

File tree

6 files changed

+178
-74
lines changed

6 files changed

+178
-74
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
8282
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `-1` disables the cache. Any other value specifies the cache size.
8383
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(Optional)_
8484
| `schema` | The search path to set. _(Optional)_
85-
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`. _(Optional)_
85+
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
8686
| `sslRootCert` | Path to SSL CA certificate in PEM format. _(Optional)_
8787
| `sslKey` | Path to SSL key for TLS authentication in PEM format. _(Optional)_
8888
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. _(Optional)_
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.client;
18+
19+
import io.netty.buffer.ByteBufAllocator;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelInboundHandlerAdapter;
22+
import io.netty.handler.ssl.SslHandler;
23+
import io.netty.util.concurrent.Future;
24+
import io.netty.util.concurrent.GenericFutureListener;
25+
import io.r2dbc.spi.R2dbcPermissionDeniedException;
26+
import reactor.core.publisher.Mono;
27+
28+
import javax.net.ssl.SSLEngine;
29+
import java.net.InetSocketAddress;
30+
import java.util.concurrent.CompletableFuture;
31+
32+
abstract class AbstractPostgresSSLHandlerAdapter extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<Channel>> {
33+
34+
private final SSLConfig sslConfig;
35+
36+
private final SSLEngine sslEngine;
37+
38+
private final SslHandler sslHandler;
39+
40+
private final CompletableFuture<Void> handshakeFuture;
41+
42+
AbstractPostgresSSLHandlerAdapter(ByteBufAllocator alloc, SSLConfig sslConfig) {
43+
this.sslConfig = sslConfig;
44+
this.sslEngine = sslConfig.getSslProvider().get()
45+
.getSslContext()
46+
.newEngine(alloc);
47+
this.handshakeFuture = new CompletableFuture<>();
48+
this.sslHandler = new SslHandler(this.sslEngine);
49+
this.sslHandler.handshakeFuture().addListener(this);
50+
}
51+
52+
@Override
53+
public void operationComplete(Future<Channel> future) throws Exception {
54+
if (!future.isSuccess()) {
55+
completeHandshakeExceptionally(future.cause());
56+
return;
57+
}
58+
if (this.sslConfig.getSslMode() != SSLMode.VERIFY_FULL) {
59+
completeHandshake();
60+
return;
61+
}
62+
Channel channel = future.get();
63+
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
64+
String hostName = socketAddress.getHostName();
65+
if (this.sslConfig.getHostnameVerifier().verify(hostName, this.sslEngine.getSession())) {
66+
completeHandshake();
67+
} else {
68+
completeHandshakeExceptionally(new PostgresqlSslException(String.format("The hostname '%s' could not be verified.", socketAddress.getAddress().toString())));
69+
}
70+
}
71+
72+
void completeHandshakeExceptionally(Throwable ex) {
73+
this.handshakeFuture.completeExceptionally(ex);
74+
}
75+
76+
boolean completeHandshake() {
77+
return this.handshakeFuture.complete(null);
78+
}
79+
80+
Mono<Void> getHandshake() {
81+
return Mono.fromFuture(this.handshakeFuture);
82+
}
83+
84+
SslHandler getSslHandler() {
85+
return this.sslHandler;
86+
}
87+
88+
/**
89+
* Postgres-specific {@link R2dbcPermissionDeniedException}.
90+
*/
91+
static final class PostgresqlSslException extends R2dbcPermissionDeniedException {
92+
93+
PostgresqlSslException(String msg) {
94+
super(msg);
95+
}
96+
97+
}
98+
99+
}

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,19 @@ private static Mono<? extends Void> registerSslHandler(SSLConfig sslConfig, Conn
382382
if (sslConfig.getSslMode().startSsl()) {
383383

384384
return Mono.defer(() -> {
385-
SSLSessionHandlerAdapter sslSessionHandlerAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig);
386-
it.addHandlerFirst(sslSessionHandlerAdapter);
387-
return sslSessionHandlerAdapter.getHandshake();
385+
AbstractPostgresSSLHandlerAdapter sslAdapter;
386+
if (sslConfig.getSslMode() == SSLMode.TUNNEL) {
387+
sslAdapter = new SSLTunnelHandlerAdapter(it.outbound().alloc(), sslConfig);
388+
} else {
389+
sslAdapter = new SSLSessionHandlerAdapter(it.outbound().alloc(), sslConfig);
390+
}
391+
392+
it.addHandlerFirst(sslAdapter);
393+
return sslAdapter.getHandshake();
388394

389395
}).subscribeOn(Schedulers.boundedElastic());
390396
}
391397
} catch (Throwable e) {
392-
e.printStackTrace();
393398
throw new RuntimeException(e);
394399
}
395400

src/main/java/io/r2dbc/postgresql/client/SSLMode.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ public enum SSLMode {
4545
/**
4646
* I want my data encrypted, and I accept the overhead. I want to be sure that I connect to a server I trust, and that it's the one I specify.
4747
*/
48-
VERIFY_FULL("verify-full");
48+
VERIFY_FULL("verify-full"),
49+
50+
/**
51+
* I want to use a SSL tunnel instead of following Postgres SSL handshake protocol.
52+
*/
53+
TUNNEL("tunnel");
4954

5055
private final String value;
5156

src/main/java/io/r2dbc/postgresql/client/SSLSessionHandlerAdapter.java

+16-68
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,30 @@
1818

1919
import io.netty.buffer.ByteBuf;
2020
import io.netty.buffer.ByteBufAllocator;
21-
import io.netty.channel.Channel;
2221
import io.netty.channel.ChannelHandlerContext;
23-
import io.netty.channel.ChannelInboundHandlerAdapter;
24-
import io.netty.handler.ssl.SslHandler;
25-
import io.netty.util.concurrent.Future;
26-
import io.netty.util.concurrent.GenericFutureListener;
2722
import io.r2dbc.postgresql.message.frontend.SSLRequest;
28-
import io.r2dbc.spi.R2dbcPermissionDeniedException;
2923
import reactor.core.publisher.Mono;
3024

31-
import javax.net.ssl.SSLEngine;
32-
import java.net.InetSocketAddress;
33-
import java.util.concurrent.CompletableFuture;
34-
35-
// https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11
36-
final class SSLSessionHandlerAdapter extends ChannelInboundHandlerAdapter implements GenericFutureListener<Future<Channel>> {
25+
/**
26+
* SSL handler assuming the endpoint a Postgres endpoint following the {@link SSLRequest} flow.
27+
*
28+
* @see <a href="https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11">https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11</a>
29+
*/
30+
final class SSLSessionHandlerAdapter extends AbstractPostgresSSLHandlerAdapter {
3731

3832
private final ByteBufAllocator alloc;
3933

4034
private final SSLConfig sslConfig;
4135

42-
private final SSLEngine sslEngine;
43-
44-
private final SslHandler sslHandler;
45-
46-
private final CompletableFuture<Void> handshakeFuture;
47-
4836
SSLSessionHandlerAdapter(ByteBufAllocator alloc, SSLConfig sslConfig) {
37+
super(alloc, sslConfig);
4938
this.alloc = alloc;
5039
this.sslConfig = sslConfig;
51-
this.sslEngine = sslConfig.getSslProvider().get()
52-
.getSslContext()
53-
.newEngine(alloc);
54-
this.handshakeFuture = new CompletableFuture<>();
55-
this.sslHandler = new SslHandler(this.sslEngine);
56-
this.sslHandler.handshakeFuture().addListener(this);
40+
}
41+
42+
@Override
43+
public void handlerAdded(ChannelHandlerContext ctx) {
44+
Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush);
5745
}
5846

5947
@Override
@@ -72,67 +60,27 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
7260
}
7361
}
7462

75-
@Override
76-
public void operationComplete(Future<Channel> future) throws Exception {
77-
if (!future.isSuccess()) {
78-
this.handshakeFuture.completeExceptionally(future.cause());
79-
return;
80-
}
81-
if (this.sslConfig.getSslMode() != SSLMode.VERIFY_FULL) {
82-
this.handshakeFuture.complete(null);
83-
return;
84-
}
85-
Channel channel = future.get();
86-
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
87-
String hostName = socketAddress.getHostName();
88-
if (this.sslConfig.getHostnameVerifier().verify(hostName, this.sslEngine.getSession())) {
89-
this.handshakeFuture.complete(null);
90-
} else {
91-
this.handshakeFuture.completeExceptionally(new PostgresqlSslException(String.format("The hostname '%s' could not be verified.", socketAddress.getAddress().toString())));
92-
}
93-
}
94-
9563
private void processSslDisabled(ChannelHandlerContext ctx, Object msg) {
9664
if (this.sslConfig.getSslMode().requireSsl()) {
9765
PostgresqlSslException e =
9866
new PostgresqlSslException("Server support for SSL connection is disabled, but client was configured with SSL mode " + this.sslConfig.getSslMode());
99-
this.handshakeFuture.completeExceptionally(e);
67+
completeHandshakeExceptionally(e);
10068
} else {
101-
this.handshakeFuture.complete(null);
69+
completeHandshake();
10270
}
10371
}
10472

10573
private void processSslEnabled(ChannelHandlerContext ctx, Object msg) {
10674
if (this.sslConfig.getSslMode() == SSLMode.DISABLE) {
10775

10876
PostgresqlSslException e = new PostgresqlSslException("Server requires SSL handshake, but client was configured with SSL mode DISABLE");
109-
this.handshakeFuture.completeExceptionally(e);
77+
completeHandshakeExceptionally(e);
11078
return;
11179
}
11280
ctx.channel().pipeline()
113-
.addFirst(this.sslHandler)
81+
.addFirst(this.getSslHandler())
11482
.remove(this);
11583
ctx.fireChannelRead(msg);
11684
}
11785

118-
@Override
119-
public void handlerAdded(ChannelHandlerContext ctx) {
120-
Mono.from(SSLRequest.INSTANCE.encode(this.alloc)).subscribe(ctx::writeAndFlush);
121-
}
122-
123-
Mono<Void> getHandshake() {
124-
return Mono.fromFuture(this.handshakeFuture);
125-
}
126-
127-
/**
128-
* Postgres-specific {@link R2dbcPermissionDeniedException}.
129-
*/
130-
static final class PostgresqlSslException extends R2dbcPermissionDeniedException {
131-
132-
PostgresqlSslException(String msg) {
133-
super(msg);
134-
}
135-
136-
}
137-
13886
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2017-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.client;
18+
19+
import io.netty.buffer.ByteBufAllocator;
20+
import io.netty.channel.ChannelHandlerContext;
21+
22+
/**
23+
* SSL handler assuming the endpoint is a SSL tunnel and not a Postgres endpoint.
24+
*/
25+
final class SSLTunnelHandlerAdapter extends AbstractPostgresSSLHandlerAdapter {
26+
27+
private SSLConfig sslConfig;
28+
29+
SSLTunnelHandlerAdapter(ByteBufAllocator alloc, SSLConfig sslConfig) {
30+
super(alloc, sslConfig);
31+
}
32+
33+
@Override
34+
public void handlerAdded(ChannelHandlerContext ctx) {
35+
36+
if (this.sslConfig.getSslMode() == SSLMode.DISABLE) {
37+
38+
PostgresqlSslException e = new PostgresqlSslException("Server requires SSL handshake, but client was configured with SSL mode DISABLE");
39+
completeHandshakeExceptionally(e);
40+
return;
41+
}
42+
ctx.channel().pipeline()
43+
.addFirst(this.getSslHandler())
44+
.remove(this);
45+
}
46+
47+
}

0 commit comments

Comments
 (0)