Skip to content

Commit 4390792

Browse files
kressimp911de
authored andcommitted
Implement high availability cluster functionality
We now provide a failover connectivity model supporting host selection from an array of endpoints. [resolves #120][#474][#203]
1 parent 6d2c1a8 commit 4390792

20 files changed

+1456
-171
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.ConnectionSettings;
5+
import reactor.core.publisher.Mono;
6+
7+
import java.net.SocketAddress;
8+
9+
public interface ClientSupplier {
10+
11+
Mono<Client> connect(SocketAddress endpoint, ConnectionSettings settings);
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.ConnectionSettings;
5+
import reactor.core.publisher.Mono;
6+
7+
import java.net.SocketAddress;
8+
import java.util.Map;
9+
import java.util.function.Function;
10+
11+
public interface ConnectionStrategy {
12+
13+
Mono<Client> connect();
14+
15+
ConnectionStrategy withOptions(Map<String, String> options);
16+
17+
interface ComposableConnectionStrategy extends ConnectionStrategy {
18+
19+
default <T extends ConnectionStrategy> T chainIf(boolean guard, Function<ComposableConnectionStrategy, T> nextStrategyProvider, Class<T> klass) {
20+
return guard ? nextStrategyProvider.apply(this) : klass.cast(this);
21+
}
22+
23+
ComposableConnectionStrategy withAddress(SocketAddress address);
24+
25+
ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);
26+
27+
ComposableConnectionStrategy withOptions(Map<String, String> options);
28+
29+
}
30+
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.netty.channel.unix.DomainSocketAddress;
4+
import io.r2dbc.postgresql.client.MultiHostConfiguration;
5+
import io.r2dbc.postgresql.client.SSLConfig;
6+
import io.r2dbc.postgresql.client.SSLMode;
7+
import io.r2dbc.postgresql.client.SingleHostConfiguration;
8+
9+
import java.net.InetSocketAddress;
10+
import java.net.SocketAddress;
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class ConnectionStrategyFactory {
15+
16+
public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
17+
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
18+
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
19+
SSLConfig sslConfig = configuration.getSslConfig();
20+
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
21+
return new DefaultConnectionStrategy(address, clientSupplier, configuration, configuration.getConnectionSettings(), configuration.getOptions())
22+
.chainIf(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(configuration, strategy), ConnectionStrategy.ComposableConnectionStrategy.class)
23+
.chainIf(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(createSocketAddress(multiHostConfiguration), configuration, strategy), ConnectionStrategy.class);
24+
}
25+
26+
private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
27+
if (!configuration.isUseSocket()) {
28+
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
29+
}
30+
return DomainSocketFactory.getDomainSocketAddress(configuration);
31+
}
32+
33+
static class DomainSocketFactory {
34+
private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
35+
return new DomainSocketAddress(configuration.getRequiredSocket());
36+
}
37+
}
38+
39+
private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {
40+
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
41+
for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
42+
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
43+
}
44+
return addressList;
45+
}
46+
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
4+
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
5+
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
6+
import io.r2dbc.postgresql.client.Client;
7+
import io.r2dbc.postgresql.client.ConnectionSettings;
8+
import io.r2dbc.postgresql.client.StartupMessageFlow;
9+
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
10+
import io.r2dbc.postgresql.util.Assert;
11+
import reactor.core.publisher.Mono;
12+
13+
import javax.annotation.Nullable;
14+
import java.net.SocketAddress;
15+
import java.util.Map;
16+
17+
public class DefaultConnectionStrategy implements ConnectionStrategy.ComposableConnectionStrategy {
18+
19+
private final SocketAddress address;
20+
21+
private final ClientSupplier clientSupplier;
22+
23+
private final PostgresqlConnectionConfiguration configuration;
24+
25+
private final ConnectionSettings connectionSettings;
26+
27+
private final Map<String, String> options;
28+
29+
DefaultConnectionStrategy(
30+
@Nullable SocketAddress address,
31+
ClientSupplier clientSupplier,
32+
PostgresqlConnectionConfiguration configuration,
33+
ConnectionSettings connectionSettings,
34+
@Nullable Map<String, String> options
35+
) {
36+
this.address = address;
37+
this.clientSupplier = clientSupplier;
38+
this.configuration = configuration;
39+
this.connectionSettings = connectionSettings;
40+
this.options = options;
41+
}
42+
43+
@Override
44+
public Mono<Client> connect() {
45+
Assert.requireNonNull(this.address, "address must not be null");
46+
return this.clientSupplier.connect(this.address, this.connectionSettings)
47+
.delayUntil(client -> StartupMessageFlow
48+
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), this.options)
49+
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
50+
}
51+
52+
@Override
53+
public ComposableConnectionStrategy withAddress(SocketAddress address) {
54+
return new DefaultConnectionStrategy(address, this.clientSupplier, this.configuration, this.connectionSettings, this.options);
55+
}
56+
57+
@Override
58+
public ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings) {
59+
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, connectionSettings, this.options);
60+
}
61+
62+
@Override
63+
public ComposableConnectionStrategy withOptions(Map<String, String> options) {
64+
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, this.connectionSettings, options);
65+
}
66+
67+
protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
68+
if (PasswordAuthenticationHandler.supports(message)) {
69+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
70+
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
71+
} else if (SASLAuthenticationHandler.supports(message)) {
72+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
73+
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
74+
} else {
75+
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
76+
}
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.MultiHostConfiguration;
5+
import io.r2dbc.postgresql.codec.DefaultCodecs;
6+
import io.r2dbc.spi.IsolationLevel;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Mono;
9+
import reactor.util.context.Context;
10+
11+
import javax.annotation.Nullable;
12+
import java.net.SocketAddress;
13+
import java.util.ArrayList;
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.Map;
17+
import java.util.concurrent.ConcurrentHashMap;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.function.Predicate;
20+
21+
import static io.r2dbc.postgresql.TargetServerType.ANY;
22+
import static io.r2dbc.postgresql.TargetServerType.MASTER;
23+
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
24+
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;
25+
26+
public class MultiHostConnectionStrategy implements ConnectionStrategy {
27+
28+
private final List<SocketAddress> addresses;
29+
30+
private final PostgresqlConnectionConfiguration configuration;
31+
32+
private final ComposableConnectionStrategy connectionStrategy;
33+
34+
private final MultiHostConfiguration multiHostConfiguration;
35+
36+
private final Map<SocketAddress, HostSpecStatus> statusMap;
37+
38+
MultiHostConnectionStrategy(List<SocketAddress> addresses, PostgresqlConnectionConfiguration configuration, ComposableConnectionStrategy connectionStrategy) {
39+
this.addresses = addresses;
40+
this.configuration = configuration;
41+
this.connectionStrategy = connectionStrategy;
42+
this.multiHostConfiguration = this.configuration.getMultiHostConfiguration();
43+
this.statusMap = new ConcurrentHashMap<>();
44+
}
45+
46+
@Override
47+
public Mono<Client> connect() {
48+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
49+
TargetServerType targetServerType = this.multiHostConfiguration.getTargetServerType();
50+
return this.tryConnect(targetServerType)
51+
.onErrorResume(e -> {
52+
if (!exceptionRef.compareAndSet(null, e)) {
53+
exceptionRef.get().addSuppressed(e);
54+
}
55+
return Mono.empty();
56+
})
57+
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
58+
? this.tryConnect(MASTER)
59+
: Mono.empty()))
60+
.switchIfEmpty(Mono.error(() -> {
61+
Throwable error = exceptionRef.get();
62+
if (error == null) {
63+
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType.getValue()), null);
64+
} else {
65+
return error;
66+
}
67+
}));
68+
}
69+
70+
@Override
71+
public ConnectionStrategy withOptions(Map<String, String> options) {
72+
return new MultiHostConnectionStrategy(this.addresses, this.configuration, this.connectionStrategy.withOptions(options));
73+
}
74+
75+
private Mono<Client> tryConnect(TargetServerType targetServerType) {
76+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
77+
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate)
78+
.onErrorResume(e -> {
79+
if (!exceptionRef.compareAndSet(null, e)) {
80+
exceptionRef.get().addSuppressed(e);
81+
}
82+
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
83+
return Mono.empty();
84+
}))
85+
.next()
86+
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
87+
? Mono.error(exceptionRef.get())
88+
: Mono.empty()));
89+
}
90+
91+
private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
92+
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
93+
? HostSpecStatus.ok(candidate)
94+
: oldStatus;
95+
}
96+
97+
private static Mono<Boolean> isPrimaryServer(Client client, PostgresqlConnectionConfiguration configuration) {
98+
PostgresqlConnection connection = new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE,
99+
StatementCache.fromPreparedStatementCacheQueries(client, configuration.getPreparedStatementCacheQueries()), IsolationLevel.READ_UNCOMMITTED, configuration);
100+
return connection.createStatement("show transaction_read_only")
101+
.execute()
102+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
103+
.map(s -> s.equalsIgnoreCase("off"))
104+
.next();
105+
}
106+
107+
private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
108+
return Flux.create(sink -> {
109+
Predicate<Long> needsRecheck = updated -> System.currentTimeMillis() > updated + this.multiHostConfiguration.getHostRecheckTime().toMillis();
110+
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
111+
if (this.multiHostConfiguration.isLoadBalanceHosts()) {
112+
Collections.shuffle(addresses);
113+
}
114+
boolean addressEmitted = false;
115+
for (SocketAddress address : addresses) {
116+
HostSpecStatus currentStatus = this.statusMap.get(address);
117+
if (currentStatus == null || needsRecheck.test(currentStatus.updated) || targetServerType.allowStatus(currentStatus.hostStatus)) {
118+
sink.next(address);
119+
addressEmitted = true;
120+
}
121+
}
122+
if (!addressEmitted) {
123+
// if no candidate matches the requirement or all of them are in unavailable status, try all the hosts
124+
for (SocketAddress address : addresses) {
125+
sink.next(address);
126+
}
127+
}
128+
sink.complete();
129+
});
130+
}
131+
132+
private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate) {
133+
return Mono.create(sink -> this.connectionStrategy.withAddress(candidate).connect().subscribe(client -> {
134+
this.statusMap.compute(candidate, (a, oldStatus) -> evaluateStatus(candidate, oldStatus));
135+
if (targetServerType == ANY) {
136+
sink.success(client);
137+
return;
138+
}
139+
isPrimaryServer(client, this.configuration).subscribe(
140+
isPrimary -> {
141+
if (isPrimary) {
142+
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
143+
} else {
144+
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
145+
}
146+
if (isPrimary && targetServerType == MASTER) {
147+
sink.success(client);
148+
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
149+
sink.success(client);
150+
} else {
151+
client.close().subscribe(v -> sink.success(), sink::error, sink::success, Context.of(sink.contextView()));
152+
}
153+
},
154+
sink::error, () -> {}, Context.of(sink.contextView()));
155+
}, sink::error, () -> {}, Context.of(sink.contextView())));
156+
}
157+
158+
enum HostStatus {
159+
CONNECT_FAIL,
160+
CONNECT_OK,
161+
PRIMARY,
162+
STANDBY
163+
}
164+
165+
private static class HostSpecStatus {
166+
167+
public final SocketAddress address;
168+
169+
public final HostStatus hostStatus;
170+
171+
public final long updated;
172+
173+
private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
174+
this.address = address;
175+
this.hostStatus = hostStatus;
176+
this.updated = System.currentTimeMillis();
177+
}
178+
179+
public static HostSpecStatus fail(SocketAddress host) {
180+
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
181+
}
182+
183+
public static HostSpecStatus ok(SocketAddress host) {
184+
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
185+
}
186+
187+
public static HostSpecStatus primary(SocketAddress host) {
188+
return new HostSpecStatus(host, HostStatus.PRIMARY);
189+
}
190+
191+
public static HostSpecStatus standby(SocketAddress host) {
192+
return new HostSpecStatus(host, HostStatus.STANDBY);
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)