Skip to content

Commit 90ff1de

Browse files
author
Viktors Baltauss
committed
Failover functionality merge conflicts fixed
2 parents edd4748 + 20be3d3 commit 90ff1de

18 files changed

+1579
-352
lines changed
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import reactor.core.publisher.Mono;
5+
import reactor.util.annotation.Nullable;
6+
7+
import java.util.Map;
8+
9+
public interface ClientFactory {
10+
11+
static ClientFactory getFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
12+
if (configuration.getSingleHostConfiguration() != null) {
13+
return new SingleHostClientFactory(configuration, clientSupplier);
14+
}
15+
if (configuration.getMultipleHostsConfiguration() != null) {
16+
return new MultipleHostsClientFactory(configuration, clientSupplier);
17+
}
18+
throw new IllegalArgumentException("Can't build client factory based on configuration " + configuration);
19+
}
20+
21+
Mono<Client> create(@Nullable Map<String, String> options);
22+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
4+
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
5+
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
6+
import io.r2dbc.postgresql.client.Client;
7+
import io.r2dbc.postgresql.client.SSLConfig;
8+
import io.r2dbc.postgresql.client.SSLMode;
9+
import io.r2dbc.postgresql.client.StartupMessageFlow;
10+
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
11+
import io.r2dbc.postgresql.util.Assert;
12+
import reactor.core.publisher.Mono;
13+
14+
import javax.annotation.Nullable;
15+
import java.net.SocketAddress;
16+
import java.util.Map;
17+
import java.util.function.Predicate;
18+
19+
public abstract class ClientFactoryBase implements ClientFactory {
20+
21+
private final ClientSupplier clientSupplier;
22+
23+
private final PostgresqlConnectionConfiguration configuration;
24+
25+
protected ClientFactoryBase(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
26+
this.configuration = configuration;
27+
this.clientSupplier = clientSupplier;
28+
}
29+
30+
protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
31+
if (PasswordAuthenticationHandler.supports(message)) {
32+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
33+
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
34+
} else if (SASLAuthenticationHandler.supports(message)) {
35+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
36+
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
37+
} else {
38+
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
39+
}
40+
}
41+
42+
protected Mono<Client> tryConnectToEndpoint(SocketAddress endpoint, @Nullable Map<String, String> options) {
43+
SSLConfig sslConfig = this.configuration.getSslConfig();
44+
Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
45+
return this.tryConnectWithConfig(sslConfig, endpoint, options)
46+
.onErrorResume(
47+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW),
48+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), endpoint, options)
49+
.onErrorResume(sslAuthError -> {
50+
e.addSuppressed(sslAuthError);
51+
return Mono.error(e);
52+
})
53+
)
54+
.onErrorResume(
55+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER),
56+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), endpoint, options)
57+
.onErrorResume(sslAuthError -> {
58+
e.addSuppressed(sslAuthError);
59+
return Mono.error(e);
60+
})
61+
);
62+
}
63+
64+
protected Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, SocketAddress endpoint, @Nullable Map<String, String> options) {
65+
return this.clientSupplier.connect(endpoint, this.configuration.getConnectTimeout(), sslConfig)
66+
.delayUntil(client -> StartupMessageFlow
67+
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration
68+
.getDatabase(), this.configuration.getUsername(), options)
69+
.handle(ExceptionFactory.INSTANCE::handleErrorResponse))
70+
.cast(Client.class);
71+
}
72+
73+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.SSLConfig;
5+
import reactor.core.publisher.Mono;
6+
7+
import javax.annotation.Nullable;
8+
import java.net.SocketAddress;
9+
import java.time.Duration;
10+
11+
public interface ClientSupplier {
12+
13+
Mono<Client> connect(SocketAddress endpoint, @Nullable Duration connectTimeout, SSLConfig sslConfig);
14+
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.r2dbc.postgresql.client.Client;
4+
import io.r2dbc.postgresql.client.MultipleHostsConfiguration;
5+
import io.r2dbc.postgresql.codec.DefaultCodecs;
6+
import io.r2dbc.postgresql.util.Assert;
7+
import io.r2dbc.spi.IsolationLevel;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
11+
import javax.annotation.Nullable;
12+
import java.net.InetSocketAddress;
13+
import java.net.SocketAddress;
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.List;
17+
import java.util.Map;
18+
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import static io.r2dbc.postgresql.TargetServerType.ANY;
22+
import static io.r2dbc.postgresql.TargetServerType.MASTER;
23+
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
24+
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;
25+
26+
class MultipleHostsClientFactory extends ClientFactoryBase {
27+
28+
private final List<SocketAddress> addresses;
29+
30+
private final MultipleHostsConfiguration configuration;
31+
32+
private final Map<SocketAddress, HostSpecStatus> statusMap = new ConcurrentHashMap<>();
33+
34+
public MultipleHostsClientFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
35+
super(configuration, clientSupplier);
36+
this.configuration = Assert.requireNonNull(configuration.getMultipleHostsConfiguration(), "MultipleHostsConfiguration must not be null");
37+
this.addresses = MultipleHostsClientFactory.createSocketAddress(this.configuration);
38+
}
39+
40+
@Override
41+
public Mono<Client> create(@Nullable Map<String, String> options) {
42+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
43+
TargetServerType targetServerType = this.configuration.getTargetServerType();
44+
return this.tryConnect(targetServerType, options)
45+
.onErrorResume(e -> {
46+
if (!exceptionRef.compareAndSet(null, e)) {
47+
exceptionRef.get().addSuppressed(e);
48+
}
49+
return Mono.empty();
50+
})
51+
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
52+
? this.tryConnect(MASTER, options)
53+
: Mono.empty()))
54+
.switchIfEmpty(Mono.error(() -> {
55+
Throwable error = exceptionRef.get();
56+
if (error == null) {
57+
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType
58+
.getValue()), null);
59+
} else {
60+
return error;
61+
}
62+
}));
63+
}
64+
65+
public Mono<Client> tryConnect(TargetServerType targetServerType, @Nullable Map<String, String> options) {
66+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
67+
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate, options)
68+
.onErrorResume(e -> {
69+
if (!exceptionRef.compareAndSet(null, e)) {
70+
exceptionRef.get().addSuppressed(e);
71+
}
72+
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
73+
return Mono.empty();
74+
}))
75+
.next()
76+
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
77+
? Mono.error(exceptionRef.get())
78+
: Mono.empty()));
79+
}
80+
81+
private static List<SocketAddress> createSocketAddress(MultipleHostsConfiguration configuration) {
82+
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
83+
for (MultipleHostsConfiguration.ServerHost host : configuration.getHosts()) {
84+
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
85+
}
86+
return addressList;
87+
}
88+
89+
private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
90+
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
91+
? HostSpecStatus.ok(candidate)
92+
: oldStatus;
93+
}
94+
95+
private static Mono<Boolean> isPrimaryServer(Client client) {
96+
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
97+
StatementCache disabledStatementCache = StatementCache.fromPreparedStatementCacheQueries(client, 0);
98+
PostgresqlConnection connection = new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, disabledStatementCache,
99+
IsolationLevel.READ_COMMITTED, false);
100+
ConnectionContext context = new ConnectionContext(client, codecs, connection);
101+
return new SimpleQueryPostgresqlStatement(context, "show transaction_read_only")
102+
.execute()
103+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
104+
.map(s -> s.equalsIgnoreCase("off"))
105+
.next();
106+
}
107+
108+
private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
109+
return Flux.create(sink -> {
110+
long now = System.currentTimeMillis();
111+
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
112+
if (this.configuration.isLoadBalanceHosts()) {
113+
Collections.shuffle(addresses);
114+
}
115+
int counter = 0;
116+
for (SocketAddress address : addresses) {
117+
HostSpecStatus currentStatus = this.statusMap.get(address);
118+
if (currentStatus == null || now > currentStatus.updated + this.configuration.getHostRecheckTime()) {
119+
sink.next(address);
120+
counter++;
121+
} else if (targetServerType.allowStatus(currentStatus.hostStatus)) {
122+
sink.next(address);
123+
counter++;
124+
}
125+
}
126+
if (counter == 0) {
127+
// if no candidate match the requirement or all of them are in unavailable status try all the hosts
128+
addresses = new ArrayList<>(this.addresses);
129+
if (this.configuration.isLoadBalanceHosts()) {
130+
Collections.shuffle(addresses);
131+
}
132+
for (SocketAddress address : addresses) {
133+
sink.next(address);
134+
}
135+
}
136+
sink.complete();
137+
});
138+
}
139+
140+
private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate, @Nullable Map<String, String> options) {
141+
return Mono.create(sink -> this.tryConnectToEndpoint(candidate, options).subscribe(client -> {
142+
this.statusMap.compute(candidate, (a, oldStatus) -> MultipleHostsClientFactory.evaluateStatus(candidate, oldStatus));
143+
if (targetServerType == ANY) {
144+
sink.success(client);
145+
return;
146+
}
147+
MultipleHostsClientFactory.isPrimaryServer(client).subscribe(
148+
isPrimary -> {
149+
if (isPrimary) {
150+
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
151+
} else {
152+
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
153+
}
154+
if (isPrimary && targetServerType == MASTER) {
155+
sink.success(client);
156+
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
157+
sink.success(client);
158+
} else {
159+
client.close().subscribe(v -> sink.success(), sink::error, sink::success, sink.currentContext());
160+
}
161+
},
162+
sink::error,
163+
() -> {
164+
},
165+
sink.currentContext()
166+
);
167+
}, sink::error, () -> {
168+
}, sink.currentContext()));
169+
}
170+
171+
enum HostStatus {
172+
CONNECT_FAIL,
173+
CONNECT_OK,
174+
PRIMARY,
175+
STANDBY
176+
}
177+
178+
private static class HostSpecStatus {
179+
180+
public final SocketAddress address;
181+
182+
public final HostStatus hostStatus;
183+
184+
public final long updated = System.currentTimeMillis();
185+
186+
private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
187+
this.address = address;
188+
this.hostStatus = hostStatus;
189+
}
190+
191+
public static HostSpecStatus fail(SocketAddress host) {
192+
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
193+
}
194+
195+
public static HostSpecStatus ok(SocketAddress host) {
196+
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
197+
}
198+
199+
public static HostSpecStatus primary(SocketAddress host) {
200+
return new HostSpecStatus(host, HostStatus.PRIMARY);
201+
}
202+
203+
public static HostSpecStatus standby(SocketAddress host) {
204+
return new HostSpecStatus(host, HostStatus.STANDBY);
205+
}
206+
}
207+
}

0 commit comments

Comments
 (0)