Skip to content

Commit 4bcc697

Browse files
committed
split configuration for single/multiple hosts
1 parent c43f980 commit 4bcc697

13 files changed

+553
-437
lines changed
Lines changed: 9 additions & 253 deletions
Original file line numberDiff line numberDiff line change
@@ -1,266 +1,22 @@
11
package io.r2dbc.postgresql;
22

3-
import io.netty.channel.unix.DomainSocketAddress;
4-
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
5-
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
6-
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
73
import io.r2dbc.postgresql.client.Client;
8-
import io.r2dbc.postgresql.client.SSLConfig;
9-
import io.r2dbc.postgresql.client.SSLMode;
10-
import io.r2dbc.postgresql.client.StartupMessageFlow;
11-
import io.r2dbc.postgresql.codec.DefaultCodecs;
12-
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
13-
import io.r2dbc.postgresql.util.Assert;
14-
import reactor.core.publisher.Flux;
154
import reactor.core.publisher.Mono;
5+
import reactor.util.annotation.Nullable;
166

17-
import javax.annotation.Nullable;
18-
import java.net.InetSocketAddress;
19-
import java.net.SocketAddress;
20-
import java.time.Duration;
21-
import java.util.ArrayList;
22-
import java.util.Collections;
23-
import java.util.List;
247
import java.util.Map;
25-
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.atomic.AtomicReference;
27-
import java.util.function.Function;
28-
import java.util.function.Predicate;
298

30-
import static io.r2dbc.postgresql.TargetServerType.ANY;
31-
import static io.r2dbc.postgresql.TargetServerType.MASTER;
32-
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
33-
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;
9+
public interface ClientFactory {
3410

35-
class ClientFactory implements Function<Map<String, String>, Mono<? extends Client>> {
36-
37-
private final List<SocketAddress> addresses;
38-
39-
private final PostgresqlConnectionConfiguration configuration;
40-
41-
private final Map<SocketAddress, HostSpecStatus> statusMap = new ConcurrentHashMap<>();
42-
43-
private final ConnectionSupplier connectionSupplier;
44-
45-
public ClientFactory(PostgresqlConnectionConfiguration configuration, ConnectionSupplier connectionSupplier) {
46-
this.configuration = configuration;
47-
this.addresses = ClientFactory.createSocketAddress(this.configuration);
48-
this.connectionSupplier = connectionSupplier;
49-
}
50-
51-
@Override
52-
public Mono<Client> apply(Map<String, String> options) {
53-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
54-
TargetServerType targetServerType = this.configuration.getTargetServerType();
55-
return this.tryConnect(targetServerType, options)
56-
.onErrorResume(e -> this.addresses.size() > 1, e -> {
57-
if (!exceptionRef.compareAndSet(null, e)) {
58-
exceptionRef.get().addSuppressed(e);
59-
}
60-
return Mono.empty();
61-
})
62-
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
63-
? this.tryConnect(MASTER, options)
64-
: Mono.empty()))
65-
.switchIfEmpty(Mono.error(() -> {
66-
Throwable error = exceptionRef.get();
67-
if (error == null) {
68-
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType.getValue()), null);
69-
} else {
70-
return error;
71-
}
72-
}));
73-
}
74-
75-
public Mono<Client> tryConnect(TargetServerType targetServerType, @Nullable Map<String, String> options) {
76-
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
77-
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate, options)
78-
.onErrorResume(e -> this.addresses.size() > 1, e -> {
79-
if (!exceptionRef.compareAndSet(null, e)) {
80-
exceptionRef.get().addSuppressed(e);
81-
}
82-
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
83-
return Mono.empty();
84-
}))
85-
.next()
86-
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
87-
? Mono.error(exceptionRef.get())
88-
: Mono.empty()));
89-
}
90-
91-
private static List<SocketAddress> createSocketAddress(PostgresqlConnectionConfiguration configuration) {
92-
if (!configuration.isUseSocket()) {
93-
if (configuration.getTmpHosts() != null) {
94-
String[] hosts = configuration.getTmpHosts();
95-
int[] ports = configuration.getTmpPorts();
96-
List<SocketAddress> addressList = new ArrayList<>(hosts.length);
97-
for (int i = 0; i < hosts.length; i++) {
98-
String host = hosts[i];
99-
int port = ports[i];
100-
addressList.add(InetSocketAddress.createUnresolved(host, port));
101-
}
102-
return addressList;
103-
} else {
104-
return Collections.singletonList(InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort()));
105-
}
106-
}
107-
108-
if (configuration.isUseSocket()) {
109-
return Collections.singletonList(new DomainSocketAddress(configuration.getRequiredSocket()));
11+
static ClientFactory getFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
12+
if (configuration.getSingleHostConfiguration() != null) {
13+
return new SingleHostClientFactory(configuration, clientSupplier);
11014
}
111-
112-
throw new IllegalArgumentException("Cannot create SocketAddress for " + configuration);
113-
}
114-
115-
private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
116-
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
117-
? HostSpecStatus.ok(candidate)
118-
: oldStatus;
119-
}
120-
121-
private static Mono<Boolean> isPrimaryServer(Client client) {
122-
return new SimpleQueryPostgresqlStatement(client, new DefaultCodecs(client.getByteBufAllocator()), "show transaction_read_only")
123-
.execute()
124-
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
125-
.map(s -> s.equalsIgnoreCase("off"))
126-
.next();
127-
}
128-
129-
private AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
130-
if (PasswordAuthenticationHandler.supports(message)) {
131-
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
132-
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
133-
} else if (SASLAuthenticationHandler.supports(message)) {
134-
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
135-
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
136-
} else {
137-
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
15+
if (configuration.getMultipleHostsConfiguration() != null) {
16+
return new MultipleHostsClientFactory(configuration, clientSupplier);
13817
}
18+
throw new IllegalArgumentException("Can't build client factory based on configuration " + configuration);
13919
}
14020

141-
private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
142-
return Flux.create(sink -> {
143-
if (this.addresses.size() == 1) {
144-
sink.next(this.addresses.get(0));
145-
sink.complete();
146-
return;
147-
}
148-
long now = System.currentTimeMillis();
149-
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
150-
if (this.configuration.isLoadBalance()) {
151-
Collections.shuffle(addresses);
152-
}
153-
for (SocketAddress address : addresses) {
154-
HostSpecStatus currentStatus = this.statusMap.get(address);
155-
if (currentStatus == null || now > currentStatus.updated + this.configuration.getHostRecheckTime()) {
156-
sink.next(address);
157-
} else if (targetServerType.allowStatus(currentStatus.hostStatus)) {
158-
sink.next(address);
159-
}
160-
}
161-
sink.complete();
162-
});
163-
}
164-
165-
private Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, SocketAddress endpoint, @Nullable Map<String, String> options) {
166-
return this.connectionSupplier.connect(endpoint, this.configuration.getConnectTimeout(), sslConfig)
167-
.delayUntil(client -> StartupMessageFlow
168-
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), options)
169-
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
170-
}
171-
172-
private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate, @Nullable Map<String, String> options) {
173-
return Mono.create(sink -> this.tryConnectToEndpoint(candidate, options).subscribe(client -> {
174-
this.statusMap.compute(candidate, (a, oldStatus) -> ClientFactory.evaluateStatus(candidate, oldStatus));
175-
if (targetServerType == ANY) {
176-
sink.success(client);
177-
return;
178-
}
179-
ClientFactory.isPrimaryServer(client).subscribe(
180-
isPrimary -> {
181-
if (isPrimary) {
182-
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
183-
} else {
184-
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
185-
}
186-
if (isPrimary && targetServerType == MASTER) {
187-
sink.success(client);
188-
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
189-
sink.success(client);
190-
} else {
191-
client.close().subscribe(v -> sink.success(), sink::error, sink::success, sink.currentContext());
192-
}
193-
},
194-
sink::error,
195-
() -> {
196-
},
197-
sink.currentContext()
198-
);
199-
}, sink::error, () -> {
200-
}, sink.currentContext()));
201-
}
202-
203-
private Mono<Client> tryConnectToEndpoint(SocketAddress endpoint, @Nullable Map<String, String> options) {
204-
SSLConfig sslConfig = this.configuration.getSslConfig();
205-
Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
206-
return this.tryConnectWithConfig(sslConfig, endpoint, options)
207-
.onErrorResume(
208-
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW),
209-
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), endpoint, options)
210-
.onErrorResume(sslAuthError -> {
211-
e.addSuppressed(sslAuthError);
212-
return Mono.error(e);
213-
})
214-
)
215-
.onErrorResume(
216-
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER),
217-
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), endpoint, options)
218-
.onErrorResume(sslAuthError -> {
219-
e.addSuppressed(sslAuthError);
220-
return Mono.error(e);
221-
})
222-
);
223-
}
224-
225-
public interface ConnectionSupplier {
226-
227-
Mono<Client> connect(SocketAddress endpoint, @Nullable Duration connectTimeout, SSLConfig sslConfig);
228-
}
229-
230-
enum HostStatus {
231-
CONNECT_FAIL,
232-
CONNECT_OK,
233-
PRIMARY,
234-
STANDBY
235-
}
236-
237-
private static class HostSpecStatus {
238-
239-
public final SocketAddress address;
240-
241-
public final HostStatus hostStatus;
242-
243-
public final long updated = System.currentTimeMillis();
244-
245-
private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
246-
this.address = address;
247-
this.hostStatus = hostStatus;
248-
}
249-
250-
public static HostSpecStatus fail(SocketAddress host) {
251-
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
252-
}
253-
254-
public static HostSpecStatus ok(SocketAddress host) {
255-
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
256-
}
257-
258-
public static HostSpecStatus primary(SocketAddress host) {
259-
return new HostSpecStatus(host, HostStatus.PRIMARY);
260-
}
261-
262-
public static HostSpecStatus standby(SocketAddress host) {
263-
return new HostSpecStatus(host, HostStatus.STANDBY);
264-
}
265-
}
21+
Mono<? extends Client> create(@Nullable Map<String, String> options);
26622
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
4+
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
5+
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
6+
import io.r2dbc.postgresql.client.Client;
7+
import io.r2dbc.postgresql.client.SSLConfig;
8+
import io.r2dbc.postgresql.client.SSLMode;
9+
import io.r2dbc.postgresql.client.StartupMessageFlow;
10+
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
11+
import io.r2dbc.postgresql.util.Assert;
12+
import reactor.core.publisher.Mono;
13+
14+
import javax.annotation.Nullable;
15+
import java.net.SocketAddress;
16+
import java.util.Map;
17+
import java.util.function.Predicate;
18+
19+
public abstract class ClientFactoryBase implements ClientFactory {
20+
21+
private final ClientSupplier clientSupplier;
22+
23+
private final PostgresqlConnectionConfiguration configuration;
24+
25+
protected ClientFactoryBase(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
26+
this.configuration = configuration;
27+
this.clientSupplier = clientSupplier;
28+
}
29+
30+
protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
31+
if (PasswordAuthenticationHandler.supports(message)) {
32+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
33+
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
34+
} else if (SASLAuthenticationHandler.supports(message)) {
35+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
36+
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
37+
} else {
38+
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
39+
}
40+
}
41+
42+
protected Mono<? extends Client> tryConnectToEndpoint(SocketAddress endpoint, @Nullable Map<String, String> options) {
43+
SSLConfig sslConfig = this.configuration.getSslConfig();
44+
Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
45+
return this.tryConnectWithConfig(sslConfig, endpoint, options)
46+
.onErrorResume(
47+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW),
48+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), endpoint, options)
49+
.onErrorResume(sslAuthError -> {
50+
e.addSuppressed(sslAuthError);
51+
return Mono.error(e);
52+
})
53+
)
54+
.onErrorResume(
55+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER),
56+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), endpoint, options)
57+
.onErrorResume(sslAuthError -> {
58+
e.addSuppressed(sslAuthError);
59+
return Mono.error(e);
60+
})
61+
);
62+
}
63+
64+
protected Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, SocketAddress endpoint, @Nullable Map<String, String> options) {
65+
return this.clientSupplier.connect(endpoint, this.configuration.getConnectTimeout(), sslConfig)
66+
.delayUntil(client -> StartupMessageFlow
67+
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), options)
68+
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
69+
}
70+
71+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.SSLConfig;
5+
import reactor.core.publisher.Mono;
6+
7+
import javax.annotation.Nullable;
8+
import java.net.SocketAddress;
9+
import java.time.Duration;
10+
11+
public interface ClientSupplier {
12+
13+
Mono<Client> connect(SocketAddress endpoint, @Nullable Duration connectTimeout, SSLConfig sslConfig);
14+
}

0 commit comments

Comments
 (0)