Skip to content

Commit 2e0969e

Browse files
committed
Polishing
Refactor connection creation from composeable ConnectionStrategy into composeable ConnectionFunctions and a parameter-less ConnectionStrategy that holds all connection target details. Refactor SSL fallback into ConnectionFunction as SSL is part of the initial handshake. Move startup options into ConnectionSettings. Simplify sink subscriptions into Flux composition for easier synchronization of closed connections. Add duration style parser. Add license headers and since tags, update documentation. [#120][resolves #474][#203] Signed-off-by: Mark Paluch <[email protected]>
1 parent 4390792 commit 2e0969e

33 files changed

+1271
-509
lines changed

README.md

+54-32
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This driver provides the following features:
88
* Login with username/password (MD5, SASL/SCRAM) or implicit trust
99
* SCRAM authentication
1010
* Unix Domain Socket transport
11+
* Connection Fail-over supporting multiple hosts
1112
* TLS
1213
* Explicit transactions
1314
* Notifications
@@ -67,40 +68,44 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
6768

6869
**Supported ConnectionFactory Discovery Options**
6970

70-
| Option | Description
71-
| ----------------- | -----------
72-
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`).
73-
| `driver` | Must be `postgresql`.
74-
| `host` | Server hostname to connect to.
75-
| `port` | Server port to connect to. Defaults to `5432`. _(Optional)_
76-
| `socket` | Unix Domain Socket path to connect to as alternative to TCP. _(Optional)_
77-
| `username` | Login username.
78-
| `password` | Login password. _(Optional when using TLS Certificate authentication)_
79-
| `database` | Database to select. _(Optional)_
80-
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
81-
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
82-
| `compatibilityMode` | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to `false`. _(Optional)_
83-
| `errorResponseLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
84-
| `extensions` | Collection of `Extension` to provide additional extensions when creating a connection factory. Defaults to empty. _(Optional)_
85-
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
86-
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
87-
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
88-
| `lockWaitTimeout` | Lock wait timeout. _(Optional)_
89-
| `noticeLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
90-
| `preferAttachedBuffers` |Configure whether codecs should prefer attached data buffers. The default is `false`, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as `Json` to avoid memory leaks.
71+
| Option | Description
72+
|---------------------------------| -----------
73+
| `ssl` | Enables SSL usage (`SSLMode.VERIFY_FULL`).
74+
| `driver` | Must be `postgresql`.
75+
| `protocol` | Protocol specifier. Empty to use single-host operations. Supported: `failover` for multi-server failover operations. _(Optional)_
76+
| `host` | Server hostname to connect to. May contain a comma-separated list of hosts with ports when using the `failover` protocol.
77+
| `port` | Server port to connect to. Defaults to `5432`. _(Optional)_
78+
| `socket` | Unix Domain Socket path to connect to as alternative to TCP. _(Optional)_
79+
| `username` | Login username.
80+
| `password` | Login password. _(Optional when using TLS Certificate authentication)_
81+
| `database` | Database to select. _(Optional)_
82+
| `applicationName` | The name of the application connecting to the database. Defaults to `r2dbc-postgresql`. _(Optional)_
83+
| `autodetectExtensions` | Whether to auto-detect and register `Extension`s from the class path. Defaults to `true`. _(Optional)_
84+
| `compatibilityMode` | Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to `false`. _(Optional)_
85+
| `errorResponseLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
86+
| `extensions` | Collection of `Extension` to provide additional extensions when creating a connection factory. Defaults to empty. _(Optional)_
87+
| `fetchSize` | The default number of rows to return when fetching results. Defaults to `0` for unlimited. _(Optional)_
88+
| `forceBinary` | Whether to force binary transfer. Defaults to `false`. _(Optional)_
89+
| `hostRecheckTime` | Host status recheck time when using multi-server operations. Defaults to `10 seconds`. _(Optional)_
90+
| `loadBalanceHosts` | Whether to shuffle the list of given hostnames before connect when using multi-server operations. Defaults to `true. _(Optional)_
91+
| `loopResources` | TCP/Socket LoopResources (depends on the endpoint connection type). _(Optional)_
92+
| `lockWaitTimeout` | Lock wait timeout. _(Optional)_
93+
| `noticeLogLevel` | Log level for error responses. Any of `OFF`, `DEBUG`, `INFO`, `WARN` or `ERROR` Defaults to `DEBUG`. _(Optional)_
94+
| `preferAttachedBuffers` |Configure whether codecs should prefer attached data buffers. The default is `false`, meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as `Json` to avoid memory leaks.
9195
| `preparedStatementCacheQueries` | Determine the number of queries that are cached in each connection. The default is `-1`, meaning there's no limit. The value of `0` disables the cache. Any other value specifies the cache size.
92-
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(
96+
| `options` | A `Map<String, String>` of connection parameters. These are applied to each database connection created by the `ConnectionFactory`. Useful for setting generic [PostgreSQL connection parameters][psql-runtime-config]. _(
9397
Optional)_
94-
| `schema` | The search path to set. _(Optional)_
95-
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
96-
| `sslRootCert` | Path to SSL CA certificate in PEM format. Can be also a resource path. _(Optional)_
97-
| `sslKey` | Path to SSL key for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
98-
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
99-
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
100-
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
101-
| `statementTimeout`| Statement timeout. _(Optional)_
102-
| `tcpNoDelay` | Enable/disable TCP NoDelay. Enabled by default. _(Optional)_
103-
| `tcpKeepAlive` | Enable/disable TCP KeepAlive. Disabled by default. _(Optional)_
98+
| `schema` | The search path to set. _(Optional)_
99+
| `sslMode` | SSL mode to use, see `SSLMode` enum. Supported values: `DISABLE`, `ALLOW`, `PREFER`, `REQUIRE`, `VERIFY_CA`, `VERIFY_FULL`, `TUNNEL`. _(Optional)_
100+
| `sslRootCert` | Path to SSL CA certificate in PEM format. Can be also a resource path. _(Optional)_
101+
| `sslKey` | Path to SSL key for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
102+
| `sslCert` | Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. _(Optional)_
103+
| `sslPassword` | Key password to decrypt SSL key. _(Optional)_
104+
| `sslHostnameVerifier` | `javax.net.ssl.HostnameVerifier` implementation. _(Optional)_
105+
| `statementTimeout` | Statement timeout. _(Optional)_
106+
| `targetServerType` | Type of server to use when using multi-host operations. Supported values: `ANY`, `PRIMARY`, `SECONDARY`, `PREFER_SECONDARY`. Defaults to `ANY`. _(Optional)_
107+
| `tcpNoDelay` | Enable/disable TCP NoDelay. Enabled by default. _(Optional)_
108+
| `tcpKeepAlive` | Enable/disable TCP KeepAlive. Disabled by default. _(Optional)_
104109

105110
**Programmatic Configuration**
106111

@@ -167,6 +172,23 @@ If you'd rather like the latest snapshots of the upcoming major version, use our
167172
</repository>
168173
```
169174

175+
## Connection Fail-over
176+
177+
To support simple connection fail-over it is possible to define multiple endpoints (host and port pairs) in the connection url separated by commas. The driver will try once to connect to each of them
178+
in order until the connection succeeds. If none succeeds a normal connection exception is thrown. Make sure to specify the `failover` protocol.
179+
180+
The syntax for the connection url is:
181+
182+
```
183+
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3
184+
```
185+
186+
For example an application can create two connection pools. One data source is for writes, another for reads. The write pool limits connections only to a primary node:
187+
188+
```
189+
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3?targetServerType=primary.
190+
```
191+
170192
## Cursors
171193

172194
R2DBC Postgres supports both, the [simple](https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.4)

src/main/java/io/r2dbc/postgresql/ClientSupplier.java

-13
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.r2dbc.postgresql;
18+
19+
import io.r2dbc.postgresql.client.Client;
20+
import io.r2dbc.postgresql.client.ConnectionSettings;
21+
import reactor.core.publisher.Mono;
22+
23+
import java.net.SocketAddress;
24+
25+
/**
26+
* Interface defining a function how to connect to a single {@link SocketAddress endpoint} applying {@link ConnectionSettings}.
27+
* <p>A connection function is a low-level utility whose result is a valid {@link Client} object. Connection functions may perform multiple connection attempts (e.g. SSL handshake downgrading).
28+
* Topology discovery is a higher-level concept that is typically encapsulated as part of a {@link ConnectionStrategy}.
29+
*
30+
* @see ConnectionStrategy
31+
* @since 1.0
32+
*/
33+
@FunctionalInterface
34+
public interface ConnectionFunction {
35+
36+
/**
37+
* Establish a connection to the given {@link SocketAddress endpoint} applying {@link ConnectionSettings}.
38+
*
39+
* @param endpoint the endpoint to connect to
40+
* @param settings the settings to apply
41+
* @return a mono that connects to the given endpoint upon subscription
42+
* @throws IllegalArgumentException if {@code socketAddress} or {@code settings} is {@code null}
43+
*/
44+
Mono<Client> connect(SocketAddress endpoint, ConnectionSettings settings);
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,41 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package io.r2dbc.postgresql;
218

319
import io.r2dbc.postgresql.client.Client;
4-
import io.r2dbc.postgresql.client.ConnectionSettings;
520
import reactor.core.publisher.Mono;
621

7-
import java.net.SocketAddress;
8-
import java.util.Map;
9-
import java.util.function.Function;
10-
22+
/**
23+
* Interface defining a connection strategy on how to obtain a Postgres {@link Client} object.
24+
* <p>
25+
* Typically, connection strategies use a {@link ConnectionFunction} and are configured with a connection endpoint to establish a client connection to the target server as the {@link #connect()}
26+
* method does not take any parameters.
27+
*
28+
* @see ConnectionFunction
29+
* @since 1.0
30+
*/
31+
@FunctionalInterface
1132
public interface ConnectionStrategy {
1233

34+
/**
35+
* Establish a connection to a target server that is determined by this connection strategy.
36+
*
37+
* @return a mono that initiates the connection upon subscription.
38+
*/
1339
Mono<Client> connect();
1440

15-
ConnectionStrategy withOptions(Map<String, String> options);
16-
17-
interface ComposableConnectionStrategy extends ConnectionStrategy {
18-
19-
default <T extends ConnectionStrategy> T chainIf(boolean guard, Function<ComposableConnectionStrategy, T> nextStrategyProvider, Class<T> klass) {
20-
return guard ? nextStrategyProvider.apply(this) : klass.cast(this);
21-
}
22-
23-
ComposableConnectionStrategy withAddress(SocketAddress address);
24-
25-
ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);
26-
27-
ComposableConnectionStrategy withOptions(Map<String, String> options);
28-
29-
}
30-
3141
}

src/main/java/io/r2dbc/postgresql/ConnectionStrategyFactory.java

+53-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,24 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package io.r2dbc.postgresql;
218

319
import io.netty.channel.unix.DomainSocketAddress;
20+
import io.r2dbc.postgresql.client.Client;
21+
import io.r2dbc.postgresql.client.ConnectionSettings;
422
import io.r2dbc.postgresql.client.MultiHostConfiguration;
523
import io.r2dbc.postgresql.client.SSLConfig;
624
import io.r2dbc.postgresql.client.SSLMode;
@@ -11,36 +29,63 @@
1129
import java.util.ArrayList;
1230
import java.util.List;
1331

14-
public class ConnectionStrategyFactory {
32+
/**
33+
* Factory methods to obtain a {@link ConnectionStrategy} object.
34+
*
35+
* @since 1.0
36+
*/
37+
final class ConnectionStrategyFactory {
38+
39+
/**
40+
* Create a {@link ConnectionStrategy} that is able to connect to the specified {@link PostgresqlConnectionConfiguration configuration}.
41+
*
42+
* @param connectionFunction the raw connection function to use to create a {@link Client}. The connection function is enhanced during the connect phase to perform a handshake with the database.
43+
* @param configuration the configuration object
44+
* @return the connection strategy to use.
45+
*/
46+
public static ConnectionStrategy getConnectionStrategy(ConnectionFunction connectionFunction, PostgresqlConnectionConfiguration configuration, ConnectionSettings connectionSettings) {
47+
return doGetConnectionStrategy(new SingleHostConnectionFunction(connectionFunction, configuration), configuration, connectionSettings);
48+
}
49+
50+
private static ConnectionStrategy doGetConnectionStrategy(ConnectionFunction connectionFunction, PostgresqlConnectionConfiguration configuration, ConnectionSettings connectionSettings) {
1551

16-
public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
17-
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
18-
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
1952
SSLConfig sslConfig = configuration.getSslConfig();
20-
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
21-
return new DefaultConnectionStrategy(address, clientSupplier, configuration, configuration.getConnectionSettings(), configuration.getOptions())
22-
.chainIf(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(configuration, strategy), ConnectionStrategy.ComposableConnectionStrategy.class)
23-
.chainIf(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(createSocketAddress(multiHostConfiguration), configuration, strategy), ConnectionStrategy.class);
53+
if (!SSLMode.DISABLE.equals(sslConfig.getSslMode())) {
54+
connectionFunction = new SslFallbackConnectionFunction(sslConfig, connectionFunction);
55+
}
56+
57+
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
58+
if (multiHostConfiguration != null) {
59+
return new MultiHostConnectionStrategy(connectionFunction, createSocketAddress(multiHostConfiguration), configuration, connectionSettings);
60+
}
61+
62+
return new SingleHostConnectionStrategy(connectionFunction, createSocketAddress(configuration.getRequiredSingleHostConfiguration()), connectionSettings);
2463
}
2564

2665
private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
2766
if (!configuration.isUseSocket()) {
2867
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
2968
}
69+
3070
return DomainSocketFactory.getDomainSocketAddress(configuration);
3171
}
3272

3373
static class DomainSocketFactory {
74+
3475
private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
3576
return new DomainSocketAddress(configuration.getRequiredSocket());
3677
}
78+
3779
}
3880

3981
private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {
82+
4083
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
84+
4185
for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
4286
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
4387
}
88+
4489
return addressList;
4590
}
4691

src/main/java/io/r2dbc/postgresql/DisabledStatementCache.java

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
class DisabledStatementCache implements StatementCache {
2222

23+
static final DisabledStatementCache INSTANCE = new DisabledStatementCache();
24+
2325
private static final String UNNAMED_STATEMENT_NAME = "";
2426

2527
DisabledStatementCache() {

0 commit comments

Comments
 (0)