Skip to content

High availability cluster functionality #474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ClientSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;

public interface ClientSupplier {

Mono<Client> connect(SocketAddress endpoint, ConnectionSettings settings);

}
31 changes: 31 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import reactor.core.publisher.Mono;

import java.net.SocketAddress;
import java.util.Map;
import java.util.function.Function;

public interface ConnectionStrategy {

Mono<Client> connect();

ConnectionStrategy withOptions(Map<String, String> options);

interface ComposableConnectionStrategy extends ConnectionStrategy {

default <T extends ConnectionStrategy> T chainIf(boolean guard, Function<ComposableConnectionStrategy, T> nextStrategyProvider, Class<T> klass) {
return guard ? nextStrategyProvider.apply(this) : klass.cast(this);
}

ComposableConnectionStrategy withAddress(SocketAddress address);

ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings);

ComposableConnectionStrategy withOptions(Map<String, String> options);

}

}
47 changes: 47 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ConnectionStrategyFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.r2dbc.postgresql;

import io.netty.channel.unix.DomainSocketAddress;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
import io.r2dbc.postgresql.client.SingleHostConfiguration;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;

public class ConnectionStrategyFactory {

public static ConnectionStrategy getConnectionStrategy(ClientSupplier clientSupplier, PostgresqlConnectionConfiguration configuration) {
SingleHostConfiguration singleHostConfiguration = configuration.getSingleHostConfiguration();
MultiHostConfiguration multiHostConfiguration = configuration.getMultiHostConfiguration();
SSLConfig sslConfig = configuration.getSslConfig();
SocketAddress address = singleHostConfiguration != null ? createSocketAddress(singleHostConfiguration) : null;
return new DefaultConnectionStrategy(address, clientSupplier, configuration, configuration.getConnectionSettings(), configuration.getOptions())
.chainIf(!SSLMode.DISABLE.equals(sslConfig.getSslMode()), strategy -> new SslFallbackConnectionStrategy(configuration, strategy), ConnectionStrategy.ComposableConnectionStrategy.class)
.chainIf(multiHostConfiguration != null, strategy -> new MultiHostConnectionStrategy(createSocketAddress(multiHostConfiguration), configuration, strategy), ConnectionStrategy.class);
}

private static SocketAddress createSocketAddress(SingleHostConfiguration configuration) {
if (!configuration.isUseSocket()) {
return InetSocketAddress.createUnresolved(configuration.getRequiredHost(), configuration.getPort());
}
return DomainSocketFactory.getDomainSocketAddress(configuration);
}

static class DomainSocketFactory {
private static SocketAddress getDomainSocketAddress(SingleHostConfiguration configuration) {
return new DomainSocketAddress(configuration.getRequiredSocket());
}
}

private static List<SocketAddress> createSocketAddress(MultiHostConfiguration configuration) {
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
for (MultiHostConfiguration.ServerHost host : configuration.getHosts()) {
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
}
return addressList;
}

}
79 changes: 79 additions & 0 deletions src/main/java/io/r2dbc/postgresql/DefaultConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.StartupMessageFlow;
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.Map;

public class DefaultConnectionStrategy implements ConnectionStrategy.ComposableConnectionStrategy {

private final SocketAddress address;

private final ClientSupplier clientSupplier;

private final PostgresqlConnectionConfiguration configuration;

private final ConnectionSettings connectionSettings;

private final Map<String, String> options;

DefaultConnectionStrategy(
@Nullable SocketAddress address,
ClientSupplier clientSupplier,
PostgresqlConnectionConfiguration configuration,
ConnectionSettings connectionSettings,
@Nullable Map<String, String> options
) {
this.address = address;
this.clientSupplier = clientSupplier;
this.configuration = configuration;
this.connectionSettings = connectionSettings;
this.options = options;
}

@Override
public Mono<Client> connect() {
Assert.requireNonNull(this.address, "address must not be null");
return this.clientSupplier.connect(this.address, this.connectionSettings)
.delayUntil(client -> StartupMessageFlow
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration.getDatabase(), this.configuration.getUsername(), this.options)
.handle(ExceptionFactory.INSTANCE::handleErrorResponse));
}

@Override
public ComposableConnectionStrategy withAddress(SocketAddress address) {
return new DefaultConnectionStrategy(address, this.clientSupplier, this.configuration, this.connectionSettings, this.options);
}

@Override
public ComposableConnectionStrategy withConnectionSettings(ConnectionSettings connectionSettings) {
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, connectionSettings, this.options);
}

@Override
public ComposableConnectionStrategy withOptions(Map<String, String> options) {
return new DefaultConnectionStrategy(this.address, this.clientSupplier, this.configuration, this.connectionSettings, options);
}

protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
if (PasswordAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
} else if (SASLAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
} else {
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
}
}

}
195 changes: 195 additions & 0 deletions src/main/java/io/r2dbc/postgresql/MultiHostConnectionStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.MultiHostConfiguration;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.spi.IsolationLevel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import static io.r2dbc.postgresql.TargetServerType.ANY;
import static io.r2dbc.postgresql.TargetServerType.MASTER;
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;

public class MultiHostConnectionStrategy implements ConnectionStrategy {

private final List<SocketAddress> addresses;

private final PostgresqlConnectionConfiguration configuration;

private final ComposableConnectionStrategy connectionStrategy;

private final MultiHostConfiguration multiHostConfiguration;

private final Map<SocketAddress, HostSpecStatus> statusMap;

MultiHostConnectionStrategy(List<SocketAddress> addresses, PostgresqlConnectionConfiguration configuration, ComposableConnectionStrategy connectionStrategy) {
this.addresses = addresses;
this.configuration = configuration;
this.connectionStrategy = connectionStrategy;
this.multiHostConfiguration = this.configuration.getMultiHostConfiguration();
this.statusMap = new ConcurrentHashMap<>();
}

@Override
public Mono<Client> connect() {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
TargetServerType targetServerType = this.multiHostConfiguration.getTargetServerType();
return this.tryConnect(targetServerType)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
return Mono.empty();
})
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
? this.tryConnect(MASTER)
: Mono.empty()))
.switchIfEmpty(Mono.error(() -> {
Throwable error = exceptionRef.get();
if (error == null) {
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType.getValue()), null);
} else {
return error;
}
}));
}

@Override
public ConnectionStrategy withOptions(Map<String, String> options) {
return new MultiHostConnectionStrategy(this.addresses, this.configuration, this.connectionStrategy.withOptions(options));
}

private Mono<Client> tryConnect(TargetServerType targetServerType) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
return Mono.empty();
}))
.next()
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
? Mono.error(exceptionRef.get())
: Mono.empty()));
}

private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
? HostSpecStatus.ok(candidate)
: oldStatus;
}

private static Mono<Boolean> isPrimaryServer(Client client, PostgresqlConnectionConfiguration configuration) {
PostgresqlConnection connection = new PostgresqlConnection(client, new DefaultCodecs(client.getByteBufAllocator()), DefaultPortalNameSupplier.INSTANCE,
StatementCache.fromPreparedStatementCacheQueries(client, configuration.getPreparedStatementCacheQueries()), IsolationLevel.READ_UNCOMMITTED, configuration);
return connection.createStatement("show transaction_read_only")
.execute()
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
.map(s -> s.equalsIgnoreCase("off"))
.next();
}

private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
return Flux.create(sink -> {
Predicate<Long> needsRecheck = updated -> System.currentTimeMillis() > updated + this.multiHostConfiguration.getHostRecheckTime().toMillis();
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
if (this.multiHostConfiguration.isLoadBalanceHosts()) {
Collections.shuffle(addresses);
}
boolean addressEmitted = false;
for (SocketAddress address : addresses) {
HostSpecStatus currentStatus = this.statusMap.get(address);
if (currentStatus == null || needsRecheck.test(currentStatus.updated) || targetServerType.allowStatus(currentStatus.hostStatus)) {
sink.next(address);
addressEmitted = true;
}
}
if (!addressEmitted) {
// if no candidate matches the requirement or all of them are in unavailable status, try all the hosts
for (SocketAddress address : addresses) {
sink.next(address);
}
}
sink.complete();
});
}

private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate) {
return Mono.create(sink -> this.connectionStrategy.withAddress(candidate).connect().subscribe(client -> {
this.statusMap.compute(candidate, (a, oldStatus) -> evaluateStatus(candidate, oldStatus));
if (targetServerType == ANY) {
sink.success(client);
return;
}
isPrimaryServer(client, this.configuration).subscribe(
isPrimary -> {
if (isPrimary) {
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
} else {
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
}
if (isPrimary && targetServerType == MASTER) {
sink.success(client);
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
sink.success(client);
} else {
client.close().subscribe(v -> sink.success(), sink::error, sink::success, Context.of(sink.contextView()));
}
},
sink::error, () -> {}, Context.of(sink.contextView()));
}, sink::error, () -> {}, Context.of(sink.contextView())));
}

enum HostStatus {
CONNECT_FAIL,
CONNECT_OK,
PRIMARY,
STANDBY
}

private static class HostSpecStatus {

public final SocketAddress address;

public final HostStatus hostStatus;

public final long updated;

private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
this.address = address;
this.hostStatus = hostStatus;
this.updated = System.currentTimeMillis();
}

public static HostSpecStatus fail(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
}

public static HostSpecStatus ok(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
}

public static HostSpecStatus primary(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.PRIMARY);
}

public static HostSpecStatus standby(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.STANDBY);
}
}
}
Loading