Skip to content

Commit 20be3d3

Browse files
committed
single/multiple hosts configuration polishing
1 parent 4bcc697 commit 20be3d3

9 files changed

+467
-74
lines changed

src/main/java/io/r2dbc/postgresql/MultipleHostsClientFactory.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.r2dbc.postgresql.client.Client;
44
import io.r2dbc.postgresql.client.MultipleHostsConfiguration;
55
import io.r2dbc.postgresql.codec.DefaultCodecs;
6+
import io.r2dbc.postgresql.util.Assert;
67
import reactor.core.publisher.Flux;
78
import reactor.core.publisher.Mono;
89

@@ -31,7 +32,7 @@ class MultipleHostsClientFactory extends ClientFactoryBase {
3132

3233
public MultipleHostsClientFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
3334
super(configuration, clientSupplier);
34-
this.configuration = configuration.getMultipleHostsConfiguration();
35+
this.configuration = Assert.requireNonNull(configuration.getMultipleHostsConfiguration(), "MultipleHostsConfiguration must not be null");
3536
this.addresses = MultipleHostsClientFactory.createSocketAddress(this.configuration);
3637
}
3738

@@ -99,22 +100,30 @@ private static Mono<Boolean> isPrimaryServer(Client client) {
99100

100101
private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
101102
return Flux.create(sink -> {
102-
if (this.addresses.size() == 1) {
103-
sink.next(this.addresses.get(0));
104-
sink.complete();
105-
return;
106-
}
107103
long now = System.currentTimeMillis();
108104
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
109-
if (this.configuration.isLoadBalance()) {
105+
if (this.configuration.isLoadBalanceHosts()) {
110106
Collections.shuffle(addresses);
111107
}
108+
int counter = 0;
112109
for (SocketAddress address : addresses) {
113110
HostSpecStatus currentStatus = this.statusMap.get(address);
114111
if (currentStatus == null || now > currentStatus.updated + this.configuration.getHostRecheckTime()) {
115112
sink.next(address);
113+
counter++;
116114
} else if (targetServerType.allowStatus(currentStatus.hostStatus)) {
117115
sink.next(address);
116+
counter++;
117+
}
118+
}
119+
if (counter == 0) {
120+
// if no candidate match the requirement or all of them are in unavailable status try all the hosts
121+
addresses = new ArrayList<>(this.addresses);
122+
if (this.configuration.isLoadBalanceHosts()) {
123+
Collections.shuffle(addresses);
124+
}
125+
for (SocketAddress address : addresses) {
126+
sink.next(address);
118127
}
119128
}
120129
sink.complete();

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

Lines changed: 98 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ public final class PostgresqlConnectionConfiguration {
7171

7272
private final String schema;
7373

74-
7574
private final String username;
7675

7776
private final SSLConfig sslConfig;
@@ -221,10 +220,10 @@ public static final class Builder {
221220
private boolean forceBinary = false;
222221

223222
@Nullable
224-
private MultipleHostsConfiguration multipleHostsConfiguration;
223+
private MultipleHostsConfiguration.Builder multipleHostsConfiguration;
225224

226225
@Nullable
227-
private SingleHostConfiguration singleHostConfiguration;
226+
private SingleHostConfiguration.Builder singleHostConfiguration;
228227

229228
private Map<String, String> options;
230229

@@ -287,21 +286,25 @@ public Builder autodetectExtensions(boolean autodetectExtensions) {
287286
* @return a configured {@link PostgresqlConnectionConfiguration}
288287
*/
289288
public PostgresqlConnectionConfiguration build() {
290-
291-
if (this.singleHostConfiguration != null && this.singleHostConfiguration.getHost() == null && this.singleHostConfiguration.getSocket() == null) {
292-
throw new IllegalArgumentException("host or socket must not be null");
289+
SingleHostConfiguration singleHostConfiguration = this.singleHostConfiguration != null
290+
? this.singleHostConfiguration.build()
291+
: null;
292+
MultipleHostsConfiguration multipleHostsConfiguration = this.multipleHostsConfiguration != null
293+
? this.multipleHostsConfiguration.build()
294+
: null;
295+
if (singleHostConfiguration == null && multipleHostsConfiguration == null) {
296+
throw new IllegalArgumentException("Either multiple hosts configuration or single host configuration should be provided");
293297
}
294-
295-
if (this.singleHostConfiguration != null && this.singleHostConfiguration.getHost() != null && this.singleHostConfiguration.getSocket() != null) {
296-
throw new IllegalArgumentException("Connection must be configured for either host/port or socket usage but not both");
298+
if (singleHostConfiguration != null && multipleHostsConfiguration != null) {
299+
throw new IllegalArgumentException("Either multiple hosts configuration or single host configuration should be provided");
297300
}
298301

299302
if (this.username == null) {
300303
throw new IllegalArgumentException("username must not be null");
301304
}
302305

303306
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.forceBinary,
304-
this.options, this.password, this.schema, this.username, this.createSslConfig(), this.singleHostConfiguration, this.multipleHostsConfiguration);
307+
this.options, this.password, this.schema, this.username, this.createSslConfig(), singleHostConfiguration, multipleHostsConfiguration);
305308
}
306309

307310
/**
@@ -377,10 +380,9 @@ public Builder forceBinary(boolean forceBinary) {
377380
public Builder host(String host) {
378381
Assert.requireNonNull(host, "host must not be null");
379382
if (this.singleHostConfiguration == null) {
380-
this.singleHostConfiguration = new SingleHostConfiguration(host, DEFAULT_PORT, null);
381-
} else {
382-
this.singleHostConfiguration = new SingleHostConfiguration(host, this.singleHostConfiguration.getPort(), this.singleHostConfiguration.getSocket());
383+
this.singleHostConfiguration = SingleHostConfiguration.builder();
383384
}
385+
this.singleHostConfiguration.host(host);
384386
return this;
385387
}
386388

@@ -418,11 +420,6 @@ public Builder password(@Nullable CharSequence password) {
418420
return this;
419421
}
420422

421-
public Builder multipleHostsConfiguration(MultipleHostsConfiguration multipleHostsConfiguration) {
422-
this.multipleHostsConfiguration = Assert.requireNonNull(multipleHostsConfiguration, "multipleHostsConfiguration must not be null");
423-
return this;
424-
}
425-
426423
/**
427424
* Configure the schema.
428425
*
@@ -442,10 +439,9 @@ public Builder schema(@Nullable String schema) {
442439
*/
443440
public Builder port(int port) {
444441
if (this.singleHostConfiguration == null) {
445-
this.singleHostConfiguration = new SingleHostConfiguration(null, port, null);
446-
} else {
447-
this.singleHostConfiguration = new SingleHostConfiguration(this.singleHostConfiguration.getHost(), port, this.singleHostConfiguration.getSocket());
442+
this.singleHostConfiguration = SingleHostConfiguration.builder();
448443
}
444+
this.singleHostConfiguration.port(port);
449445
return this;
450446
}
451447

@@ -550,17 +546,94 @@ public Builder username(String username) {
550546
*/
551547
public Builder socket(String socket) {
552548
Assert.requireNonNull(socket, "host must not be null");
553-
554549
if (this.singleHostConfiguration == null) {
555-
this.singleHostConfiguration = new SingleHostConfiguration(null, DEFAULT_PORT, socket);
556-
} else {
557-
this.singleHostConfiguration = new SingleHostConfiguration(this.singleHostConfiguration.getHost(), this.singleHostConfiguration.getPort(), socket);
550+
this.singleHostConfiguration = SingleHostConfiguration.builder();
558551
}
552+
this.singleHostConfiguration.socket(socket);
559553

560554
sslMode(SSLMode.DISABLE);
561555
return this;
562556
}
563557

558+
/**
559+
* Allows opening connections to only servers with required state, the allowed values are any, master, slave, secondary, preferSlave and preferSecondary.
560+
* The master/secondary distinction is currently done by observing if the server allows writes.
561+
* The value preferSecondary tries to connect to secondary if any are available, otherwise allows falls back to connecting also to master.
562+
* Default value is any.
563+
*
564+
* @param targetServerType target server type
565+
* @return this {@link Builder}
566+
* @throws IllegalArgumentException if {@code targetServerType} is {@code null}
567+
*/
568+
public Builder targetServerType(TargetServerType targetServerType) {
569+
if (this.multipleHostsConfiguration == null) {
570+
this.multipleHostsConfiguration = MultipleHostsConfiguration.builder();
571+
}
572+
this.multipleHostsConfiguration.targetServerType(targetServerType);
573+
return this;
574+
}
575+
576+
/**
577+
* Controls how long in seconds the knowledge about a host state is cached connection factory. The default value is 10000 milliseconds.
578+
*
579+
* @param hostRecheckTime host recheck time in milliseconds
580+
* @return this {@link Builder}
581+
*/
582+
public Builder hostRecheckTime(int hostRecheckTime) {
583+
if (this.multipleHostsConfiguration == null) {
584+
this.multipleHostsConfiguration = MultipleHostsConfiguration.builder();
585+
}
586+
this.multipleHostsConfiguration.hostRecheckTime(hostRecheckTime);
587+
return this;
588+
}
589+
590+
/**
591+
* In default mode (disabled) hosts are connected in the given order. If enabled hosts are chosen randomly from the set of suitable candidates.
592+
*
593+
* @param loadBalanceHosts is load balance mode enabled
594+
* @return this {@link Builder}
595+
*/
596+
public Builder loadBalanceHosts(boolean loadBalanceHosts) {
597+
if (this.multipleHostsConfiguration == null) {
598+
this.multipleHostsConfiguration = MultipleHostsConfiguration.builder();
599+
}
600+
this.multipleHostsConfiguration.loadBalanceHosts(loadBalanceHosts);
601+
return this;
602+
}
603+
604+
/**
605+
* Add host with default port to hosts list.
606+
*
607+
* @param host the host
608+
* @return this {@link Builder}
609+
* @throws IllegalArgumentException if {@code host} is {@code null}
610+
*/
611+
public Builder addHost(String host) {
612+
Assert.requireNonNull(host, "host must not be null");
613+
if (this.multipleHostsConfiguration == null) {
614+
this.multipleHostsConfiguration = MultipleHostsConfiguration.builder();
615+
}
616+
this.multipleHostsConfiguration.addHost(host);
617+
return this;
618+
}
619+
620+
/**
621+
* Add host to hosts list.
622+
*
623+
* @param host the host
624+
* @param port the port
625+
* @return this {@link Builder}
626+
* @throws IllegalArgumentException if {@code host} is {@code null}
627+
*/
628+
public Builder addHost(String host, int port) {
629+
Assert.requireNonNull(host, "host must not be null");
630+
if (this.multipleHostsConfiguration == null) {
631+
this.multipleHostsConfiguration = MultipleHostsConfiguration.builder();
632+
}
633+
this.multipleHostsConfiguration.addHost(host, port);
634+
return this;
635+
}
636+
564637
@Override
565638
public String toString() {
566639
return "Builder{" +

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactoryProvider.java

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
3434
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
3535
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
36+
import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL;
3637
import static io.r2dbc.spi.ConnectionFactoryOptions.SSL;
3738
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
3839

@@ -56,11 +57,31 @@ public final class PostgresqlConnectionFactoryProvider implements ConnectionFact
5657
*/
5758
public static final Option<Boolean> FORCE_BINARY = Option.valueOf("forceBinary");
5859

60+
/**
61+
* Load balance hosts.
62+
*/
63+
public static final Option<Boolean> LOAD_BALANCE_HOSTS = Option.valueOf("loadBalanceHosts");
64+
65+
/**
66+
* Host status recheck time im ms.
67+
*/
68+
public static final Option<Integer> HOST_RECHECK_TIME = Option.valueOf("hostRecheckTime");
69+
70+
/**
71+
* Target server type. Allowed values: any, master, secondary, preferSecondary.
72+
*/
73+
public static final Option<TargetServerType> TARGET_SERVER_TYPE = Option.valueOf("targetServerType");
74+
5975
/**
6076
* Driver option value.
6177
*/
6278
public static final String POSTGRESQL_DRIVER = "postgresql";
6379

80+
/**
81+
* Failover driver protocol.
82+
*/
83+
public static final String FAILOVER_PROTOCOL = "failover";
84+
6485
/**
6586
* Legacy driver option value.
6687
*/
@@ -144,21 +165,57 @@ static PostgresqlConnectionConfiguration createConfiguration(ConnectionFactoryOp
144165
builder.connectTimeout(connectionFactoryOptions.getValue(CONNECT_TIMEOUT));
145166
builder.database(connectionFactoryOptions.getValue(DATABASE));
146167

147-
if (connectionFactoryOptions.hasOption(SOCKET)) {
148-
tcp = false;
149-
builder.socket(connectionFactoryOptions.getRequiredValue(SOCKET));
150-
} else {
168+
if (FAILOVER_PROTOCOL.equals(connectionFactoryOptions.getValue(PROTOCOL))) {
169+
if (connectionFactoryOptions.hasOption(HOST_RECHECK_TIME)) {
170+
builder.hostRecheckTime(connectionFactoryOptions.getRequiredValue(HOST_RECHECK_TIME));
171+
}
172+
if (connectionFactoryOptions.hasOption(LOAD_BALANCE_HOSTS)) {
173+
Object loadBalanceHosts = connectionFactoryOptions.getRequiredValue(LOAD_BALANCE_HOSTS);
174+
if (loadBalanceHosts instanceof Boolean) {
175+
builder.loadBalanceHosts((Boolean) loadBalanceHosts);
176+
} else {
177+
builder.loadBalanceHosts(Boolean.parseBoolean(loadBalanceHosts.toString()));
178+
}
179+
}
180+
if (connectionFactoryOptions.hasOption(TARGET_SERVER_TYPE)) {
181+
Object targetServerType = connectionFactoryOptions.getRequiredValue(TARGET_SERVER_TYPE);
182+
if (targetServerType instanceof TargetServerType) {
183+
builder.targetServerType((TargetServerType) targetServerType);
184+
} else {
185+
builder.targetServerType(TargetServerType.fromValue(targetServerType.toString()));
186+
}
187+
}
188+
String hosts = connectionFactoryOptions.getRequiredValue(HOST);
189+
String[] hostsArray = hosts.split(",");
190+
for (String host : hostsArray) {
191+
String[] hostParts = host.split(":");
192+
if (hostParts.length == 1) {
193+
builder.addHost(hostParts[0]);
194+
} else {
195+
int port = Integer.parseInt(hostParts[1]);
196+
builder.addHost(hostParts[0], port);
197+
}
198+
}
151199
tcp = true;
152-
builder.host(connectionFactoryOptions.getRequiredValue(HOST));
200+
} else {
201+
if (connectionFactoryOptions.hasOption(SOCKET)) {
202+
tcp = false;
203+
builder.socket(connectionFactoryOptions.getRequiredValue(SOCKET));
204+
} else {
205+
tcp = true;
206+
builder.host(connectionFactoryOptions.getRequiredValue(HOST));
207+
}
208+
Integer port = connectionFactoryOptions.getValue(PORT);
209+
if (port != null) {
210+
builder.port(port);
211+
}
153212
}
213+
214+
154215
builder.password(connectionFactoryOptions.getValue(PASSWORD));
155216
builder.schema(connectionFactoryOptions.getValue(SCHEMA));
156217
builder.username(connectionFactoryOptions.getRequiredValue(USER));
157218

158-
Integer port = connectionFactoryOptions.getValue(PORT);
159-
if (port != null) {
160-
builder.port(port);
161-
}
162219

163220
Boolean forceBinary = connectionFactoryOptions.getValue(FORCE_BINARY);
164221

@@ -242,10 +299,6 @@ public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
242299
Assert.requireNonNull(connectionFactoryOptions, "connectionFactoryOptions must not be null");
243300

244301
String driver = connectionFactoryOptions.getValue(DRIVER);
245-
if (driver == null || !(driver.equals(POSTGRESQL_DRIVER) || driver.equals(LEGACY_POSTGRESQL_DRIVER))) {
246-
return false;
247-
}
248-
249-
return true;
302+
return driver != null && (driver.equals(POSTGRESQL_DRIVER) || driver.equals(LEGACY_POSTGRESQL_DRIVER));
250303
}
251304
}

0 commit comments

Comments
 (0)