Skip to content

Better verify conn #657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions driver/src/main/java/org/neo4j/driver/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,17 @@ public interface Driver extends AutoCloseable
* @return a {@link CompletionStage completion stage} that represents the asynchronous verification.
*/
CompletionStage<Void> verifyConnectivityAsync();

/**
* Returns true if the server or cluster the driver connects to supports multi-databases, otherwise false.
* @return true if the server or cluster the driver connects to supports multi-databases, otherwise false.
*/
boolean supportsMultiDb();

/**
* Asynchronous check if the server or cluster the driver connects to supports multi-databases.
* @return a {@link CompletionStage completion stage} that returns true if the server or cluster
* the driver connects to supports multi-databases, otherwise false.
*/
CompletionStage<Boolean> supportsMultiDbAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;

import static org.neo4j.driver.internal.async.ImmutableConnectionContext.simple;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;

/**
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
Expand All @@ -46,15 +46,13 @@ public class DirectConnectionProvider implements ConnectionProvider
@Override
public CompletionStage<Connection> acquireConnection( ConnectionContext context )
{
return connectionPool.acquire( address ).thenApply( connection -> new DirectConnection( connection, context.databaseName(), context.mode() ) );
return acquireConnection().thenApply( connection -> new DirectConnection( connection, context.databaseName(), context.mode() ) );
}

@Override
public CompletionStage<Void> verifyConnectivity()
{
// We verify the connection by establishing a connection with the remote server specified by the address.
// Connection context will be ignored as no query is run in this connection and the connection is released immediately.
return acquireConnection( simple() ).thenCompose( Connection::release );
return acquireConnection().thenCompose( Connection::release );
}

@Override
Expand All @@ -63,8 +61,26 @@ public CompletionStage<Void> close()
return connectionPool.close();
}

@Override
public CompletionStage<Boolean> supportsMultiDb()
{
return acquireConnection().thenCompose( conn -> {
boolean supportsMultiDatabase = supportsMultiDatabase( conn );
return conn.release().thenApply( ignored -> supportsMultiDatabase );
} );
}

public BoltServerAddress getAddress()
{
return address;
}

/**
* Used only for grabbing a connection with the server after hello message.
* This connection cannot be directly used for running any queries as it is missing necessary connection context
*/
private CompletionStage<Connection> acquireConnection()
{
return connectionPool.acquire( address );
}
}
12 changes: 12 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,18 @@ public CompletionStage<Void> verifyConnectivityAsync()
return sessionFactory.verifyConnectivity();
}

@Override
public boolean supportsMultiDb()
{
return Futures.blockingGet( supportsMultiDbAsync() );
}

@Override
public CompletionStage<Boolean> supportsMultiDbAsync()
{
return sessionFactory.supportsMultiDb();
}

@Override
public void verifyConnectivity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ public interface SessionFactory
CompletionStage<Void> verifyConnectivity();

CompletionStage<Void> close();

CompletionStage<Boolean> supportsMultiDb();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public CompletionStage<Void> close()
return connectionProvider.close();
}

@Override
public CompletionStage<Boolean> supportsMultiDb()
{
return connectionProvider.supportsMultiDb();
}

/**
* Get the underlying connection provider.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
import org.neo4j.driver.internal.spi.Connection;

import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;
import static org.neo4j.driver.internal.InternalBookmark.empty;

/**
* A {@link Connection} shall fulfil this {@link ImmutableConnectionContext} when acquired from a connection provider.
*/
public class ImmutableConnectionContext implements ConnectionContext
{
private static final ConnectionContext SIMPLE = new ImmutableConnectionContext( defaultDatabase(), empty(), AccessMode.READ );
private static final ConnectionContext SINGLE_DB_CONTEXT = new ImmutableConnectionContext( defaultDatabase(), empty(), AccessMode.READ );
private static final ConnectionContext MULTI_DB_CONTEXT = new ImmutableConnectionContext( systemDatabase(), empty(), AccessMode.READ );

private final DatabaseName databaseName;
private final AccessMode mode;
Expand Down Expand Up @@ -65,10 +67,10 @@ public Bookmark rediscoveryBookmark()
/**
* A simple context is used to test connectivity with a remote server/cluster.
* As long as there is a read only service, the connection shall be established successfully.
* This context should be applicable for both bolt v4 and bolt v3 routing table rediscovery.
* Depending on whether multidb is supported or not, this method returns different context for routing table discovery.
*/
public static ConnectionContext simple()
public static ConnectionContext simple( boolean supportsMultiDb )
{
return SIMPLE;
return supportsMultiDb ? MULTI_DB_CONTEXT : SINGLE_DB_CONTEXT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.internal.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;

public interface Rediscovery
{
CompletionStage<ClusterComposition> lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark );

List<BoltServerAddress> resolve();
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private CompletionStage<ClusterComposition> lookupOnInitialRouter( RoutingTable
List<BoltServerAddress> addresses;
try
{
addresses = resolve( initialRouter );
addresses = resolve();
}
catch ( Throwable error )
{
Expand Down Expand Up @@ -256,9 +256,10 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
return null;
}

private List<BoltServerAddress> resolve( BoltServerAddress address )
@Override
public List<BoltServerAddress> resolve()
{
return resolver.resolve( address )
return resolver.resolve( initialRouter )
.stream()
.flatMap( resolved -> resolveAll( BoltServerAddress.from( resolved ) ) )
.collect( toList() ); // collect to list to preserve the order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Record;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ProtocolException;
import org.neo4j.driver.exceptions.value.ValueException;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.ServerVersion;

import static java.lang.String.format;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;

public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
{
Expand All @@ -59,7 +59,7 @@ public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext r
public CompletionStage<ClusterComposition> getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark )
{
RoutingProcedureRunner runner;
if ( connection.serverVersion().greaterThanOrEqual( ServerVersion.v4_0_0 ) )
if ( supportsMultiDatabase( connection ) )
{
runner = multiDatabaseRoutingProcedureRunner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import io.netty.util.concurrent.EventExecutorGroup;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.exceptions.SecurityException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.internal.BoltServerAddress;
Expand All @@ -49,6 +51,11 @@

import static java.lang.String.format;
import static org.neo4j.driver.internal.async.ImmutableConnectionContext.simple;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
import static org.neo4j.driver.internal.util.Futures.completionExceptionCause;
import static org.neo4j.driver.internal.util.Futures.failedFuture;
import static org.neo4j.driver.internal.util.Futures.onErrorContinue;

public class LoadBalancer implements ConnectionProvider
{
Expand All @@ -58,20 +65,29 @@ public class LoadBalancer implements ConnectionProvider
private final LoadBalancingStrategy loadBalancingStrategy;
private final EventExecutorGroup eventExecutorGroup;
private final Logger log;
private final Rediscovery rediscovery;

public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connectionPool,
EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging,
LoadBalancingStrategy loadBalancingStrategy, ServerAddressResolver resolver )
{
this( connectionPool, createRoutingTables( connectionPool, eventExecutorGroup, initialRouter, resolver, settings, clock, logging ),
loadBalancerLogger( logging ), loadBalancingStrategy, eventExecutorGroup );
this( connectionPool, createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, logging ), settings, loadBalancingStrategy,
eventExecutorGroup, clock, loadBalancerLogger( logging ) );
}

LoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Logger log, LoadBalancingStrategy loadBalancingStrategy,
EventExecutorGroup eventExecutorGroup )
private LoadBalancer( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, LoadBalancingStrategy loadBalancingStrategy,
EventExecutorGroup eventExecutorGroup, Clock clock, Logger log )
{
this( connectionPool, createRoutingTables( connectionPool, rediscovery, settings, clock, log ), rediscovery, loadBalancingStrategy, eventExecutorGroup,
log );
}

LoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Rediscovery rediscovery, LoadBalancingStrategy loadBalancingStrategy,
EventExecutorGroup eventExecutorGroup, Logger log )
{
this.connectionPool = connectionPool;
this.routingTables = routingTables;
this.rediscovery = rediscovery;
this.loadBalancingStrategy = loadBalancingStrategy;
this.eventExecutorGroup = eventExecutorGroup;
this.log = log;
Expand All @@ -88,14 +104,15 @@ public CompletionStage<Connection> acquireConnection( ConnectionContext context
@Override
public CompletionStage<Void> verifyConnectivity()
{
return routingTables.refreshRoutingTable( simple() ).handle( ( ignored, error ) -> {
return this.supportsMultiDb().thenCompose( supports -> routingTables.refreshRoutingTable( simple( supports ) ) ).handle( ( ignored, error ) -> {
if ( error != null )
{
Throwable cause = Futures.completionExceptionCause( error );
Throwable cause = completionExceptionCause( error );
if ( cause instanceof ServiceUnavailableException )
{
throw Futures.asCompletionException( new ServiceUnavailableException(
"Unable to connect to database, ensure the database is running and that there is a working network connection to it.", cause ) );
"Unable to connect to database management service, ensure the database is running and that there is a working network connection to it.",
cause ) );
}
throw Futures.asCompletionException( cause );
}
Expand All @@ -109,6 +126,54 @@ public CompletionStage<Void> close()
return connectionPool.close();
}

@Override
public CompletionStage<Boolean> supportsMultiDb()
{
List<BoltServerAddress> addresses;

try
{
addresses = rediscovery.resolve();
}
catch ( Throwable error )
{
return failedFuture( error );
}

CompletableFuture<Boolean> result = completedWithNull();
Throwable baseError = new ServiceUnavailableException( "Failed to perform multi-databases feature detection with the following servers: " + addresses );

for ( BoltServerAddress address : addresses )
{
result = onErrorContinue( result, baseError, completionError -> {
// We fail fast on security errors
Throwable error = completionExceptionCause( completionError );
if ( error instanceof SecurityException )
{
return failedFuture( error );
}
return supportsMultiDb( address );
} );
}
return onErrorContinue( result, baseError, completionError -> {
// If we failed with security errors, then we rethrow the security error out, otherwise we throw the chained errors.
Throwable error = completionExceptionCause( completionError );
if ( error instanceof SecurityException )
{
return failedFuture( error );
}
return failedFuture( baseError );
} );
}

private CompletionStage<Boolean> supportsMultiDb( BoltServerAddress address )
{
return connectionPool.acquire( address ).thenCompose( conn -> {
boolean supportsMultiDatabase = supportsMultiDatabase( conn );
return conn.release().thenApply( ignored -> supportsMultiDatabase );
} );
}

private CompletionStage<Connection> acquire( AccessMode mode, RoutingTable routingTable )
{
AddressSet addresses = addressSet( mode, routingTable );
Expand All @@ -131,7 +196,7 @@ private void acquire( AccessMode mode, RoutingTable routingTable, AddressSet add

connectionPool.acquire( address ).whenComplete( ( connection, completionError ) ->
{
Throwable error = Futures.completionExceptionCause( completionError );
Throwable error = completionExceptionCause( completionError );
if ( error != null )
{
if ( error instanceof ServiceUnavailableException )
Expand Down Expand Up @@ -181,17 +246,16 @@ private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
}
}

private static RoutingTableRegistry createRoutingTables( ConnectionPool connectionPool, EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter,
ServerAddressResolver resolver, RoutingSettings settings, Clock clock, Logging logging )
private static RoutingTableRegistry createRoutingTables( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, Clock clock,
Logger log )
{
Logger log = loadBalancerLogger( logging );
Rediscovery rediscovery = createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, log );
return new RoutingTableRegistryImpl( connectionPool, rediscovery, clock, log, settings.routingTablePurgeDelayMs() );
}

private static Rediscovery createRediscovery( EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter, ServerAddressResolver resolver,
RoutingSettings settings, Clock clock, Logger log )
RoutingSettings settings, Clock clock, Logging logging )
{
Logger log = loadBalancerLogger( logging );
ClusterCompositionProvider clusterCompositionProvider = new RoutingProcedureClusterCompositionProvider( clock, settings.routingContext() );
return new RediscoveryImpl( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup, resolver, log );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.ServerVersion;

public final class MultiDatabaseUtil
{
Expand All @@ -31,4 +34,9 @@ public static void assertEmptyDatabaseName( DatabaseName databaseName, int boltV
"Database name: '%s'", boltVersion, databaseName.description() ) );
}
}

public static boolean supportsMultiDatabase( Connection connection )
{
return connection.serverVersion().greaterThanOrEqual( ServerVersion.v4_0_0 ) && connection.protocol().version() >= BoltProtocolV4.VERSION;
}
}
Loading