Skip to content

Commit c43f980

Browse files
committed
primary/standby servers support implementation
1 parent 57618c7 commit c43f980

File tree

6 files changed

+635
-124
lines changed

6 files changed

+635
-124
lines changed
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
package io.r2dbc.postgresql;
2+
3+
import io.netty.channel.unix.DomainSocketAddress;
4+
import io.r2dbc.postgresql.authentication.AuthenticationHandler;
5+
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
6+
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
7+
import io.r2dbc.postgresql.client.Client;
8+
import io.r2dbc.postgresql.client.SSLConfig;
9+
import io.r2dbc.postgresql.client.SSLMode;
10+
import io.r2dbc.postgresql.client.StartupMessageFlow;
11+
import io.r2dbc.postgresql.codec.DefaultCodecs;
12+
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
13+
import io.r2dbc.postgresql.util.Assert;
14+
import reactor.core.publisher.Flux;
15+
import reactor.core.publisher.Mono;
16+
17+
import javax.annotation.Nullable;
18+
import java.net.InetSocketAddress;
19+
import java.net.SocketAddress;
20+
import java.time.Duration;
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
import java.util.function.Function;
28+
import java.util.function.Predicate;
29+
30+
import static io.r2dbc.postgresql.TargetServerType.ANY;
31+
import static io.r2dbc.postgresql.TargetServerType.MASTER;
32+
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
33+
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;
34+
35+
class ClientFactory implements Function<Map<String, String>, Mono<? extends Client>> {
36+
37+
private final List<SocketAddress> addresses;
38+
39+
private final PostgresqlConnectionConfiguration configuration;
40+
41+
private final Map<SocketAddress, HostSpecStatus> statusMap = new ConcurrentHashMap<>();
42+
43+
private final ConnectionSupplier connectionSupplier;
44+
45+
public ClientFactory(PostgresqlConnectionConfiguration configuration, ConnectionSupplier connectionSupplier) {
46+
this.configuration = configuration;
47+
this.addresses = ClientFactory.createSocketAddress(this.configuration);
48+
this.connectionSupplier = connectionSupplier;
49+
}
50+
51+
@Override
52+
public Mono<Client> apply(Map<String, String> options) {
53+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
54+
TargetServerType targetServerType = this.configuration.getTargetServerType();
55+
return this.tryConnect(targetServerType, options)
56+
.onErrorResume(e -> this.addresses.size() > 1, e -> {
57+
if (!exceptionRef.compareAndSet(null, e)) {
58+
exceptionRef.get().addSuppressed(e);
59+
}
60+
return Mono.empty();
61+
})
62+
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
63+
? this.tryConnect(MASTER, options)
64+
: Mono.empty()))
65+
.switchIfEmpty(Mono.error(() -> {
66+
Throwable error = exceptionRef.get();
67+
if (error == null) {
68+
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType.getValue()), null);
69+
} else {
70+
return error;
71+
}
72+
}));
73+
}
74+
75+
public Mono<Client> tryConnect(TargetServerType targetServerType, @Nullable Map<String, String> options) {
76+
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
77+
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate, options)
78+
.onErrorResume(e -> this.addresses.size() > 1, e -> {
79+
if (!exceptionRef.compareAndSet(null, e)) {
80+
exceptionRef.get().addSuppressed(e);
81+
}
82+
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
83+
return Mono.empty();
84+
}))
85+
.next()
86+
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
87+
? Mono.error(exceptionRef.get())
88+
: Mono.empty()));
89+
}
90+
91+
private static List<SocketAddress> createSocketAddress(PostgresqlConnectionConfiguration configuration) {
92+
if (!configuration.isUseSocket()) {
93+
if (configuration.getTmpHosts() != null) {
94+
String[] hosts = configuration.getTmpHosts();
95+
int[] ports = configuration.getTmpPorts();
96+
List<SocketAddress> addressList = new ArrayList<>(hosts.length);
97+
for (int i = 0; i < hosts.length; i++) {
98+
String host = hosts[i];
99+
int port = ports[i];
100+
addressList.add(InetSocketAddress.createUnresolved(host, port));
101+
}
102+
return addressList;
103+
} else {
104+
return Collections.singletonList(InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort()));
105+
}
106+
}
107+
108+
if (configuration.isUseSocket()) {
109+
return Collections.singletonList(new DomainSocketAddress(configuration.getRequiredSocket()));
110+
}
111+
112+
throw new IllegalArgumentException("Cannot create SocketAddress for " + configuration);
113+
}
114+
115+
private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
116+
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
117+
? HostSpecStatus.ok(candidate)
118+
: oldStatus;
119+
}
120+
121+
private static Mono<Boolean> isPrimaryServer(Client client) {
122+
return new SimpleQueryPostgresqlStatement(client, new DefaultCodecs(client.getByteBufAllocator()), "show transaction_read_only")
123+
.execute()
124+
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
125+
.map(s -> s.equalsIgnoreCase("off"))
126+
.next();
127+
}
128+
129+
private AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
130+
if (PasswordAuthenticationHandler.supports(message)) {
131+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
132+
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
133+
} else if (SASLAuthenticationHandler.supports(message)) {
134+
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
135+
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
136+
} else {
137+
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
138+
}
139+
}
140+
141+
private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
142+
return Flux.create(sink -> {
143+
if (this.addresses.size() == 1) {
144+
sink.next(this.addresses.get(0));
145+
sink.complete();
146+
return;
147+
}
148+
long now = System.currentTimeMillis();
149+
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
150+
if (this.configuration.isLoadBalance()) {
151+
Collections.shuffle(addresses);
152+
}
153+
for (SocketAddress address : addresses) {
154+
HostSpecStatus currentStatus = this.statusMap.get(address);
155+
if (currentStatus == null || now > currentStatus.updated + this.configuration.getHostRecheckTime()) {
156+
sink.next(address);
157+
} else if (targetServerType.allowStatus(currentStatus.hostStatus)) {
158+
sink.next(address);
159+
}
160+
}
161+
sink.complete();
162+
});
163+
}
164+
165+
private Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, SocketAddress endpoint, @Nullable Map<String, String> options) {
166+
return this.connectionSupplier.connect(endpoint, this.configuration.getConnectTimeout(), sslConfig)
167+
.delayUntil(client -> StartupMessageFlow
168+
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), options)
169+
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
170+
}
171+
172+
private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate, @Nullable Map<String, String> options) {
173+
return Mono.create(sink -> this.tryConnectToEndpoint(candidate, options).subscribe(client -> {
174+
this.statusMap.compute(candidate, (a, oldStatus) -> ClientFactory.evaluateStatus(candidate, oldStatus));
175+
if (targetServerType == ANY) {
176+
sink.success(client);
177+
return;
178+
}
179+
ClientFactory.isPrimaryServer(client).subscribe(
180+
isPrimary -> {
181+
if (isPrimary) {
182+
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
183+
} else {
184+
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
185+
}
186+
if (isPrimary && targetServerType == MASTER) {
187+
sink.success(client);
188+
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
189+
sink.success(client);
190+
} else {
191+
client.close().subscribe(v -> sink.success(), sink::error, sink::success, sink.currentContext());
192+
}
193+
},
194+
sink::error,
195+
() -> {
196+
},
197+
sink.currentContext()
198+
);
199+
}, sink::error, () -> {
200+
}, sink.currentContext()));
201+
}
202+
203+
private Mono<Client> tryConnectToEndpoint(SocketAddress endpoint, @Nullable Map<String, String> options) {
204+
SSLConfig sslConfig = this.configuration.getSslConfig();
205+
Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
206+
return this.tryConnectWithConfig(sslConfig, endpoint, options)
207+
.onErrorResume(
208+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW),
209+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), endpoint, options)
210+
.onErrorResume(sslAuthError -> {
211+
e.addSuppressed(sslAuthError);
212+
return Mono.error(e);
213+
})
214+
)
215+
.onErrorResume(
216+
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER),
217+
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), endpoint, options)
218+
.onErrorResume(sslAuthError -> {
219+
e.addSuppressed(sslAuthError);
220+
return Mono.error(e);
221+
})
222+
);
223+
}
224+
225+
public interface ConnectionSupplier {
226+
227+
Mono<Client> connect(SocketAddress endpoint, @Nullable Duration connectTimeout, SSLConfig sslConfig);
228+
}
229+
230+
enum HostStatus {
231+
CONNECT_FAIL,
232+
CONNECT_OK,
233+
PRIMARY,
234+
STANDBY
235+
}
236+
237+
private static class HostSpecStatus {
238+
239+
public final SocketAddress address;
240+
241+
public final HostStatus hostStatus;
242+
243+
public final long updated = System.currentTimeMillis();
244+
245+
private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
246+
this.address = address;
247+
this.hostStatus = hostStatus;
248+
}
249+
250+
public static HostSpecStatus fail(SocketAddress host) {
251+
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
252+
}
253+
254+
public static HostSpecStatus ok(SocketAddress host) {
255+
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
256+
}
257+
258+
public static HostSpecStatus primary(SocketAddress host) {
259+
return new HostSpecStatus(host, HostStatus.PRIMARY);
260+
}
261+
262+
public static HostSpecStatus standby(SocketAddress host) {
263+
return new HostSpecStatus(host, HostStatus.STANDBY);
264+
}
265+
}
266+
}

0 commit comments

Comments
 (0)