Skip to content

High availability cluster merge #250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

import java.util.Map;

public interface ClientFactory {

static ClientFactory getFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
if (configuration.getSingleHostConfiguration() != null) {
return new SingleHostClientFactory(configuration, clientSupplier);
}
if (configuration.getMultipleHostsConfiguration() != null) {
return new MultipleHostsClientFactory(configuration, clientSupplier);
}
throw new IllegalArgumentException("Can't build client factory based on configuration " + configuration);
}

Mono<Client> create(@Nullable Map<String, String> options);
}
73 changes: 73 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ClientFactoryBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.authentication.AuthenticationHandler;
import io.r2dbc.postgresql.authentication.PasswordAuthenticationHandler;
import io.r2dbc.postgresql.authentication.SASLAuthenticationHandler;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.SSLConfig;
import io.r2dbc.postgresql.client.SSLMode;
import io.r2dbc.postgresql.client.StartupMessageFlow;
import io.r2dbc.postgresql.message.backend.AuthenticationMessage;
import io.r2dbc.postgresql.util.Assert;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.util.Map;
import java.util.function.Predicate;

public abstract class ClientFactoryBase implements ClientFactory {

private final ClientSupplier clientSupplier;

private final PostgresqlConnectionConfiguration configuration;

protected ClientFactoryBase(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
this.configuration = configuration;
this.clientSupplier = clientSupplier;
}

protected AuthenticationHandler getAuthenticationHandler(AuthenticationMessage message) {
if (PasswordAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new PasswordAuthenticationHandler(password, this.configuration.getUsername());
} else if (SASLAuthenticationHandler.supports(message)) {
CharSequence password = Assert.requireNonNull(this.configuration.getPassword(), "Password must not be null");
return new SASLAuthenticationHandler(password, this.configuration.getUsername());
} else {
throw new IllegalStateException(String.format("Unable to provide AuthenticationHandler capable of handling %s", message));
}
}

protected Mono<Client> tryConnectToEndpoint(SocketAddress endpoint, @Nullable Map<String, String> options) {
SSLConfig sslConfig = this.configuration.getSslConfig();
Predicate<Throwable> isAuthSpecificationError = e -> e instanceof ExceptionFactory.PostgresqlAuthenticationFailure;
return this.tryConnectWithConfig(sslConfig, endpoint, options)
.onErrorResume(
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.ALLOW),
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.REQUIRE), endpoint, options)
.onErrorResume(sslAuthError -> {
e.addSuppressed(sslAuthError);
return Mono.error(e);
})
)
.onErrorResume(
isAuthSpecificationError.and(e -> sslConfig.getSslMode() == SSLMode.PREFER),
e -> this.tryConnectWithConfig(sslConfig.mutateMode(SSLMode.DISABLE), endpoint, options)
.onErrorResume(sslAuthError -> {
e.addSuppressed(sslAuthError);
return Mono.error(e);
})
);
}

protected Mono<Client> tryConnectWithConfig(SSLConfig sslConfig, SocketAddress endpoint, @Nullable Map<String, String> options) {
return this.clientSupplier.connect(endpoint, this.configuration.getConnectTimeout(), sslConfig)
.delayUntil(client -> StartupMessageFlow
.exchange(this.configuration.getApplicationName(), this::getAuthenticationHandler, client, this.configuration
.getDatabase(), this.configuration.getUsername(), options)
.handle(ExceptionFactory.INSTANCE::handleErrorResponse))
.cast(Client.class);
}

}
14 changes: 14 additions & 0 deletions src/main/java/io/r2dbc/postgresql/ClientSupplier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.SSLConfig;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.net.SocketAddress;
import java.time.Duration;

public interface ClientSupplier {

Mono<Client> connect(SocketAddress endpoint, @Nullable Duration connectTimeout, SSLConfig sslConfig);
}
210 changes: 210 additions & 0 deletions src/main/java/io/r2dbc/postgresql/MultipleHostsClientFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.MultipleHostsConfiguration;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.spi.IsolationLevel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import static io.r2dbc.postgresql.TargetServerType.ANY;
import static io.r2dbc.postgresql.TargetServerType.MASTER;
import static io.r2dbc.postgresql.TargetServerType.PREFER_SECONDARY;
import static io.r2dbc.postgresql.TargetServerType.SECONDARY;

class MultipleHostsClientFactory extends ClientFactoryBase {

private final List<SocketAddress> addresses;

private final MultipleHostsConfiguration configuration;

private final Map<SocketAddress, HostSpecStatus> statusMap = new ConcurrentHashMap<>();

public MultipleHostsClientFactory(PostgresqlConnectionConfiguration configuration, ClientSupplier clientSupplier) {
super(configuration, clientSupplier);
this.configuration = Assert.requireNonNull(configuration.getMultipleHostsConfiguration(), "MultipleHostsConfiguration must not be null");
this.addresses = MultipleHostsClientFactory.createSocketAddress(this.configuration);
}

@Override
public Mono<Client> create(@Nullable Map<String, String> options) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
TargetServerType targetServerType = this.configuration.getTargetServerType();
return this.tryConnect(targetServerType, options)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
return Mono.empty();
})
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY
? this.tryConnect(MASTER, options)
: Mono.empty()))
.switchIfEmpty(Mono.error(() -> {
Throwable error = exceptionRef.get();
if (error == null) {
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type %s", targetServerType
.getValue()), null);
} else {
return error;
}
}));
}

public Mono<Client> tryConnect(TargetServerType targetServerType, @Nullable Map<String, String> options) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return this.getCandidates(targetServerType).concatMap(candidate -> this.tryConnectToCandidate(targetServerType, candidate, options)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
this.statusMap.put(candidate, HostSpecStatus.fail(candidate));
return Mono.empty();
}))
.next()
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
? Mono.error(exceptionRef.get())
: Mono.empty()));
}

private static List<SocketAddress> createSocketAddress(MultipleHostsConfiguration configuration) {
List<SocketAddress> addressList = new ArrayList<>(configuration.getHosts().size());
for (MultipleHostsConfiguration.ServerHost host : configuration.getHosts()) {
addressList.add(InetSocketAddress.createUnresolved(host.getHost(), host.getPort()));
}
return addressList;
}

private static HostSpecStatus evaluateStatus(SocketAddress candidate, @Nullable HostSpecStatus oldStatus) {
return oldStatus == null || oldStatus.hostStatus == HostStatus.CONNECT_FAIL
? HostSpecStatus.ok(candidate)
: oldStatus;
}

private static Mono<Boolean> isPrimaryServer(Client client) {
DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator());
StatementCache disabledStatementCache = StatementCache.fromPreparedStatementCacheQueries(client, 0);
PostgresqlConnection connection = new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, disabledStatementCache,
IsolationLevel.READ_COMMITTED, false);
ConnectionContext context = new ConnectionContext(client, codecs, connection);
return new SimpleQueryPostgresqlStatement(context, "show transaction_read_only")
.execute()
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, String.class)))
.map(s -> s.equalsIgnoreCase("off"))
.next();
}

private Flux<SocketAddress> getCandidates(TargetServerType targetServerType) {
return Flux.create(sink -> {
long now = System.currentTimeMillis();
List<SocketAddress> addresses = new ArrayList<>(this.addresses);
if (this.configuration.isLoadBalanceHosts()) {
Collections.shuffle(addresses);
}
int counter = 0;
for (SocketAddress address : addresses) {
HostSpecStatus currentStatus = this.statusMap.get(address);
Duration hostRecheckDuration = this.configuration.getHostRecheckTime();
boolean recheck = currentStatus == null || hostRecheckDuration.plusMillis(currentStatus.updated).toMillis() < now;
if (recheck) {
sink.next(address);
counter++;
} else if (targetServerType.allowStatus(currentStatus.hostStatus)) {
sink.next(address);
counter++;
}
}
if (counter == 0) {
// if no candidate match the requirement or all of them are in unavailable status try all the hosts
addresses = new ArrayList<>(this.addresses);
if (this.configuration.isLoadBalanceHosts()) {
Collections.shuffle(addresses);
}
for (SocketAddress address : addresses) {
sink.next(address);
}
}
sink.complete();
});
}

private Mono<Client> tryConnectToCandidate(TargetServerType targetServerType, SocketAddress candidate, @Nullable Map<String, String> options) {
return Mono.create(sink -> this.tryConnectToEndpoint(candidate, options).subscribe(client -> {
this.statusMap.compute(candidate, (a, oldStatus) -> MultipleHostsClientFactory.evaluateStatus(candidate, oldStatus));
if (targetServerType == ANY) {
sink.success(client);
return;
}
MultipleHostsClientFactory.isPrimaryServer(client).subscribe(
isPrimary -> {
if (isPrimary) {
this.statusMap.put(candidate, HostSpecStatus.primary(candidate));
} else {
this.statusMap.put(candidate, HostSpecStatus.standby(candidate));
}
if (isPrimary && targetServerType == MASTER) {
sink.success(client);
} else if (!isPrimary && (targetServerType == SECONDARY || targetServerType == PREFER_SECONDARY)) {
sink.success(client);
} else {
client.close().subscribe(v -> sink.success(), sink::error, sink::success, sink.currentContext());
}
},
sink::error,
() -> {
},
sink.currentContext()
);
}, sink::error, () -> {
}, sink.currentContext()));
}

enum HostStatus {
CONNECT_FAIL,
CONNECT_OK,
PRIMARY,
STANDBY
}

private static class HostSpecStatus {

public final SocketAddress address;

public final HostStatus hostStatus;

public final long updated = System.currentTimeMillis();

private HostSpecStatus(SocketAddress address, HostStatus hostStatus) {
this.address = address;
this.hostStatus = hostStatus;
}

public static HostSpecStatus fail(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_FAIL);
}

public static HostSpecStatus ok(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.CONNECT_OK);
}

public static HostSpecStatus primary(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.PRIMARY);
}

public static HostSpecStatus standby(SocketAddress host) {
return new HostSpecStatus(host, HostStatus.STANDBY);
}
}
}
Loading