Skip to content

Commit 11f5e66

Browse files
committed
Replace client factories with composable connection strategies
1 parent a41c42a commit 11f5e66

11 files changed

+316
-240
lines changed

src/main/java/io/r2dbc/postgresql/ClientFactory.java

-22
This file was deleted.

src/main/java/io/r2dbc/postgresql/ClientFactoryBase.java

-77
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.ConnectionSettings;
5+
import reactor.core.publisher.Mono;
6+
7+
import java.net.SocketAddress;
8+
import java.util.Map;
9+
import java.util.function.Function;
10+
11+
public interface ConnectionStrategy {
12+
13+
default ConnectionStrategy andThen(boolean guard, Function<ConnectionStrategy, ConnectionStrategy> nextStrategyProvider) {
14+
return guard ? nextStrategyProvider.apply(this) : this;
15+
}
16+
17+
Mono<Client> connect();
18+
ConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);
19+
ConnectionStrategy withAddress(SocketAddress address);
20+
ConnectionStrategy withOptions(Map<String, String> options);
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.netty.channel.unix.DomainSocketAddress;
4+
import io.r2dbc.postgresql.client.MultiHostConfiguration;
5+
import io.r2dbc.postgresql.client.SSLConfig;
6+
import io.r2dbc.postgresql.client.SSLMode;
7+
import io.r2dbc.postgresql.client.SingleHostConfiguration;
8+
9+
import java.net.InetSocketAddress;
10+
import java.net.SocketAddress;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class ConnectionStrategyFactory {
15+
16+
public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
17+
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
18+
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
19+
SSLConfig sslConfig = configuration.getSslConfig();
20+
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
21+
return new DefaultConnectionStrategy(clientSupplier, address, configuration, configuration.getConnectionSettings(), configuration.getOptions())
22+
.andThen(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(strategy, configuration, configuration.getOptions()))
23+
.andThen(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(strategy, createSocketAddress(multiHostConfiguration), configuration));
24+
}
25+
26+
private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
27+
if (!configuration.isUseSocket()) {
28+
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
29+
}
30+
return DomainSocketFactory.getDomainSocketAddress(configuration);
31+
}
32+
33+
static class DomainSocketFactory {
34+
private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
35+
return new DomainSocketAddress(configuration.getRequiredSocket());
36+
}
37+
}
38+
39+
private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {
40+
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
41+
for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
42+
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
43+
}
44+
return addressList;
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
4+
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
5+
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
6+
import io.r2dbc.postgresql.client.Client;
7+
import io.r2dbc.postgresql.client.ConnectionSettings;
8+
import io.r2dbc.postgresql.client.StartupMessageFlow;
9+
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
10+
import io.r2dbc.postgresql.util.Assert;
11+
import reactor.core.publisher.Mono;
12+
13+
import javax.annotation.Nullable;
14+
import java.net.SocketAddress;
15+
import java.util.Map;
16+
17+
public class DefaultConnectionStrategy implements ConnectionStrategy {
18+
19+
private final ClientSupplier clientSupplier;
20+
private final SocketAddress address;
21+
private final PostgresqlConnectionConfiguration configuration;
22+
private final ConnectionSettings connectionSettings;
23+
private final Map<String, String> options;
24+
25+
DefaultConnectionStrategy(
26+
ClientSupplier clientSupplier,
27+
@Nullable SocketAddress address,
28+
PostgresqlConnectionConfiguration configuration,
29+
ConnectionSettings connectionSettings,
30+
@Nullable Map<String, String> options
31+
) {
32+
this.clientSupplier = clientSupplier;
33+
this.address = address;
34+
this.configuration = configuration;
35+
this.connectionSettings = connectionSettings;
36+
this.options = options;
37+
}
38+
39+
@Override
40+
public Mono<Client> connect() {
41+
Assert.requireNonNull(this.address, "address must not be null");
42+
return this.clientSupplier.connect(this.address, this.connectionSettings)
43+
.delayUntil(client -> StartupMessageFlow
44+
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), this.options)
45+
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
46+
}
47+
48+
@Override
49+
public ConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings) {
50+
return new DefaultConnectionStrategy(this.clientSupplier, this.address, this.configuration, connectionSettings, this.options);
51+
}
52+
53+
@Override
54+
public ConnectionStrategy withAddress(SocketAddress address) {
55+
return new DefaultConnectionStrategy(this.clientSupplier, address, this.configuration, this.connectionSettings, this.options);
56+
}
57+
58+
@Override
59+
public ConnectionStrategy withOptions(Map<String, String> options) {
60+
return new DefaultConnectionStrategy(this.clientSupplier, this.address, this.configuration, this.connectionSettings, options);
61+
}
62+
63+
protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
64+
if (PasswordAuthenticationHandler.supports(message)) {
65+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
66+
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
67+
} else if (SASLAuthenticationHandler.supports(message)) {
68+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
69+
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
70+
} else {
71+
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
72+
}
73+
}
74+
75+
}

0 commit comments

Comments
 (0)