Skip to content

Commit 233f16e

Browse files
committed
Add config options for TCP nodelay and keepalive
[resolves #296]
1 parent d878858 commit 233f16e

7 files changed

+173
-15
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
8888
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. _(Optional)_
8989
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
9090
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
91+
| `tcpNoDelay` | Enabled/disable TCP NoDelay. Disabled by default. _(Optional)_
92+
| `tcpKeepAlive` | Enabled/disable TCP KeepAlive. Disabled by default _(Optional)_
9193

9294
**Programmatic Configuration**
9395

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

+55-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import javax.net.ssl.HostnameVerifier;
3232
import java.io.File;
33+
import java.net.Socket;
3334
import java.time.Duration;
3435
import java.util.ArrayList;
3536
import java.util.Collections;
@@ -82,14 +83,16 @@ public final class PostgresqlConnectionConfiguration {
8283

8384
private final SSLConfig sslConfig;
8485

86+
private final boolean tcpKeepAlive;
87+
88+
private final boolean tcpNoDelay;
89+
8590
private final int preparedStatementCacheQueries;
8691

87-
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions,
88-
@Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
89-
ToIntFunction<String> fetchSize, boolean forceBinary,
90-
@Nullable String host,
91-
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, @Nullable String schema, @Nullable String socket, String username,
92-
SSLConfig sslConfig, int preparedStatementCacheQueries) {
92+
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions, @Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions,
93+
ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host, @Nullable Map<String, String> options, @Nullable CharSequence password,
94+
int port, @Nullable String schema, @Nullable String socket, boolean tcpKeepAlive, boolean tcpNoDelay, String username, SSLConfig sslConfig,
95+
int preparedStatementCacheQueries) {
9396
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
9497
this.autodetectExtensions = autodetectExtensions;
9598
this.connectTimeout = connectTimeout;
@@ -109,6 +112,8 @@ private PostgresqlConnectionConfiguration(String applicationName, boolean autode
109112
this.socket = socket;
110113
this.username = Assert.requireNonNull(username, "username must not be null");
111114
this.sslConfig = sslConfig;
115+
this.tcpKeepAlive = tcpKeepAlive;
116+
this.tcpNoDelay = tcpNoDelay;
112117
this.preparedStatementCacheQueries = preparedStatementCacheQueries;
113118
}
114119

@@ -135,6 +140,8 @@ public String toString() {
135140
", options='" + this.options + '\'' +
136141
", password='" + obfuscate(this.password != null ? this.password.length() : 0) + '\'' +
137142
", port=" + this.port +
143+
", tcpKeepAlive=" + this.tcpKeepAlive +
144+
", tcpNoDelay=" + this.tcpNoDelay +
138145
", username='" + this.username + '\'' +
139146
'}';
140147
}
@@ -222,6 +229,14 @@ boolean isForceBinary() {
222229
return this.forceBinary;
223230
}
224231

232+
boolean isTcpKeepAlive() {
233+
return this.tcpKeepAlive;
234+
}
235+
236+
boolean isTcpNoDelay() {
237+
return this.tcpNoDelay;
238+
}
239+
225240
boolean isUseSocket() {
226241
return getSocket() != null;
227242
}
@@ -302,6 +317,10 @@ public static final class Builder {
302317

303318
private Function<SslContextBuilder, SslContextBuilder> sslContextBuilderCustomizer = Function.identity();
304319

320+
private boolean tcpKeepAlive;
321+
322+
private boolean tcpNoDelay;
323+
305324
@Nullable
306325
private String username;
307326

@@ -353,8 +372,8 @@ public PostgresqlConnectionConfiguration build() {
353372
}
354373

355374
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
356-
this.host,
357-
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig(), this.preparedStatementCacheQueries);
375+
this.host, this.options, this.password, this.port, this.schema, this.socket, this.tcpKeepAlive, this.tcpNoDelay, this.username, this.createSslConfig(),
376+
this.preparedStatementCacheQueries);
358377
}
359378

360379
/**
@@ -610,6 +629,32 @@ public Builder sslRootCert(String sslRootCert) {
610629
return this;
611630
}
612631

632+
/**
633+
* Configure TCP KeepAlive.
634+
*
635+
* @param enabled whether to enable TCP KeepAlive
636+
* @return this {@link Builder}
637+
* @see Socket#setKeepAlive(boolean)
638+
* @since 0.8.4
639+
*/
640+
public Builder tcpKeepAlive(boolean enabled) {
641+
this.tcpKeepAlive = enabled;
642+
return this;
643+
}
644+
645+
/**
646+
* Configure TCP NoDelay.
647+
*
648+
* @param enabled whether to enable TCP NoDelay
649+
* @return this {@link Builder}
650+
* @see Socket#setTcpNoDelay(boolean)
651+
* @since 0.8.4
652+
*/
653+
public Builder tcpNoDelay(boolean enabled) {
654+
this.tcpNoDelay = enabled;
655+
return this;
656+
}
657+
613658
/**
614659
* Configure the username.
615660
*
@@ -658,6 +703,8 @@ public String toString() {
658703
", sslCert='" + this.sslCert + '\'' +
659704
", sslKey='" + this.sslKey + '\'' +
660705
", sslHostnameVerifier='" + this.sslHostnameVerifier + '\'' +
706+
", tcpKeepAlive='" + this.tcpKeepAlive + '\'' +
707+
", tcpNoDelay='" + this.tcpNoDelay + '\'' +
661708
", preparedStatementCacheQueries='" + this.preparedStatementCacheQueries + '\'' +
662709
'}';
663710
}

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
2222
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
2323
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
24+
import io.r2dbc.postgresql.client.ConnectionSettings;
2425
import io.r2dbc.postgresql.client.Client;
2526
import io.r2dbc.postgresql.client.ReactorNettyClient;
2627
import io.r2dbc.postgresql.client.SSLConfig;
@@ -77,7 +78,9 @@ public final class PostgresqlConnectionFactory implements ConnectionFactory {
7778
public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configuration) {
7879
this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
7980
this.endpoint = createSocketAddress(configuration);
80-
this.clientFactory = sslConfig -> ReactorNettyClient.connect(ConnectionProvider.newConnection(), this.endpoint, configuration.getConnectTimeout(), sslConfig).cast(Client.class);
81+
82+
ConnectionSettings options = new ConnectionSettings(configuration.getConnectTimeout(), configuration.isTcpKeepAlive(), configuration.isTcpNoDelay());
83+
this.clientFactory = sslConfig -> ReactorNettyClient.connect(ConnectionProvider.newConnection(), this.endpoint, options, sslConfig).cast(Client.class);
8184
this.extensions = getExtensions(configuration);
8285
}
8386

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

+16
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
125125
*/
126126
public static final Option<String> SSL_ROOT_CERT = Option.valueOf("sslRootCert");
127127

128+
/**
129+
* Enable TCP KeepAlive.
130+
*
131+
* @since 0.8.4
132+
*/
133+
public static final Option<Boolean> TCP_KEEPALIVE = Option.valueOf("tcpKeepAlive");
134+
135+
/**
136+
* Enable TCP NoDelay.
137+
*
138+
* @since 0.8.4
139+
*/
140+
public static final Option<Boolean> TCP_NODELAY = Option.valueOf("tcpNoDelay");
141+
128142
/**
129143
* Determine the number of queries that are cached in each connection.
130144
* The default is {@code -1}, meaning there's no limit. The value of {@code 0} disables the cache. Any other value specifies the cache size.
@@ -195,6 +209,8 @@ private static PostgresqlConnectionConfiguration.Builder fromConnectionFactoryOp
195209
builder.host(options.getRequiredValue(HOST));
196210
setupSsl(builder, mapper);
197211
});
212+
mapper.from(TCP_KEEPALIVE).map(OptionMapper::toBoolean).to(builder::tcpKeepAlive);
213+
mapper.from(TCP_NODELAY).map(OptionMapper::toBoolean).to(builder::tcpNoDelay);
198214
builder.username(options.getRequiredValue(USER));
199215

200216
return builder;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql.client;
18+
19+
import reactor.util.annotation.Nullable;
20+
21+
import java.time.Duration;
22+
23+
/**
24+
* Value object capturing common connection settings.
25+
*
26+
* @since 0.8.4
27+
*/
28+
public final class ConnectionSettings {
29+
30+
private final @Nullable
31+
Duration connectTimeout;
32+
33+
private final boolean tcpKeepAlive;
34+
35+
private final boolean tcpNoDelay;
36+
37+
public ConnectionSettings(@Nullable Duration connectTimeout, boolean tcpKeepAlive, boolean tcpNoDelay) {
38+
this.tcpKeepAlive = tcpKeepAlive;
39+
this.tcpNoDelay = tcpNoDelay;
40+
this.connectTimeout = connectTimeout;
41+
}
42+
43+
@Nullable
44+
Duration getConnectTimeout() {
45+
return this.connectTimeout;
46+
}
47+
48+
boolean isTcpKeepAlive() {
49+
return this.tcpKeepAlive;
50+
}
51+
52+
boolean isTcpNoDelay() {
53+
return this.tcpNoDelay;
54+
}
55+
56+
}

src/main/java/io/r2dbc/postgresql/client/ReactorNettyClient.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -336,30 +336,34 @@ public static Mono<ReactorNettyClient> connect(String host, int port) {
336336
* @throws IllegalArgumentException if {@code host} is {@code null}
337337
*/
338338
public static Mono<ReactorNettyClient> connect(String host, int port, @Nullable Duration connectTimeout, SSLConfig sslConfig) {
339-
return connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), connectTimeout, sslConfig);
339+
return connect(ConnectionProvider.newConnection(), InetSocketAddress.createUnresolved(host, port), new ConnectionSettings(connectTimeout, false, false), sslConfig);
340340
}
341341

342342
/**
343-
* Creates a new frame processor connected to a given host.
343+
* Creates a new frame processor connected to a given {@link SocketAddress}.
344344
*
345345
* @param connectionProvider the connection provider resources
346346
* @param socketAddress the socketAddress to connect to
347-
* @param connectTimeout connect timeout
347+
* @param connectionSettings channel options
348348
* @param sslConfig SSL configuration
349349
* @throws IllegalArgumentException if {@code host} is {@code null}
350350
*/
351-
public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, @Nullable Duration connectTimeout, SSLConfig sslConfig) {
351+
public static Mono<ReactorNettyClient> connect(ConnectionProvider connectionProvider, SocketAddress socketAddress, ConnectionSettings connectionSettings, SSLConfig sslConfig) {
352352
Assert.requireNonNull(connectionProvider, "connectionProvider must not be null");
353353
Assert.requireNonNull(socketAddress, "socketAddress must not be null");
354+
Assert.requireNonNull(connectionSettings, "channelOptions must not be null");
354355

355356
TcpClient tcpClient = TcpClient.create(connectionProvider).remoteAddress(() -> socketAddress);
356357

357358
if (!(socketAddress instanceof InetSocketAddress)) {
358359
tcpClient = tcpClient.runOn(new SocketLoopResources(), true);
360+
} else {
361+
tcpClient = tcpClient.option(ChannelOption.SO_KEEPALIVE, connectionSettings.isTcpKeepAlive());
362+
tcpClient = tcpClient.option(ChannelOption.TCP_NODELAY, connectionSettings.isTcpNoDelay());
359363
}
360364

361-
if (connectTimeout != null) {
362-
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectTimeout.toMillis()));
365+
if (connectionSettings.getConnectTimeout() != null) {
366+
tcpClient = tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(connectionSettings.getConnectTimeout().toMillis()));
363367
}
364368

365369
return tcpClient.connect().flatMap(it -> {

src/test/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProviderUnitTests.java

+30
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.SOCKET;
3636
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.SSL_CONTEXT_BUILDER_CUSTOMIZER;
3737
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.SSL_MODE;
38+
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.TCP_KEEPALIVE;
39+
import static io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider.TCP_NODELAY;
3840
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
3941
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
4042
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
@@ -308,6 +310,34 @@ void shouldApplySslContextBuilderCustomizer() {
308310
assertThatIllegalStateException().isThrownBy(() -> factory.getConfiguration().getSslConfig().getSslProvider().get()).withMessageContaining("Works!");
309311
}
310312

313+
@Test
314+
void shouldConfigureTcpKeepAlive() {
315+
316+
PostgresqlConnectionFactory factory = this.provider.create(builder()
317+
.option(DRIVER, POSTGRESQL_DRIVER)
318+
.option(HOST, "test-host")
319+
.option(PASSWORD, "test-password")
320+
.option(USER, "test-user")
321+
.option(TCP_KEEPALIVE, true)
322+
.build());
323+
324+
assertThat(factory.getConfiguration().isTcpKeepAlive()).isTrue();
325+
}
326+
327+
@Test
328+
void shouldConfigureTcpNoDelay() {
329+
330+
PostgresqlConnectionFactory factory = this.provider.create(builder()
331+
.option(DRIVER, POSTGRESQL_DRIVER)
332+
.option(HOST, "test-host")
333+
.option(PASSWORD, "test-password")
334+
.option(USER, "test-user")
335+
.option(TCP_NODELAY, true)
336+
.build());
337+
338+
assertThat(factory.getConfiguration().isTcpNoDelay()).isTrue();
339+
}
340+
311341
@Test
312342
void shouldConnectUsingUnixDomainSocket() {
313343

0 commit comments

Comments
 (0)