Skip to content

Blocking API over async API #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Oct 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions driver/src/main/java/org/neo4j/driver/internal/Bookmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

import org.neo4j.driver.v1.Value;

Expand Down Expand Up @@ -98,6 +99,28 @@ public Map<String,Value> asBeginTransactionParameters()
return parameters;
}

@Override
public boolean equals( Object o )
{
if ( this == o )
{
return true;
}
if ( o == null || getClass() != o.getClass() )
{
return false;
}
Bookmark bookmark = (Bookmark) o;
return Objects.equals( values, bookmark.values ) &&
Objects.equals( maxValue, bookmark.maxValue );
}

@Override
public int hashCode()
{
return Objects.hash( values, maxValue );
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,71 +20,49 @@

import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.async.AsyncConnection;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.v1.AccessMode;

import static org.neo4j.driver.v1.AccessMode.READ;

/**
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
* the given address.
*/
public class DirectConnectionProvider implements ConnectionProvider
{
private final BoltServerAddress address;
private final ConnectionPool pool;
private final AsyncConnectionPool asyncPool;
private final ConnectionPool connectionPool;

DirectConnectionProvider( BoltServerAddress address, ConnectionPool pool, AsyncConnectionPool asyncPool )
DirectConnectionProvider( BoltServerAddress address, ConnectionPool connectionPool )
{
this.address = address;
this.pool = pool;
this.asyncPool = asyncPool;

verifyConnectivity();
this.connectionPool = connectionPool;
}

@Override
public PooledConnection acquireConnection( AccessMode mode )
public CompletionStage<Connection> acquireConnection( AccessMode mode )
{
return pool.acquire( address );
return connectionPool.acquire( address );
}

@Override
public CompletionStage<AsyncConnection> acquireAsyncConnection( AccessMode mode )
public CompletionStage<Void> verifyConnectivity()
{
return asyncPool.acquire( address );
return acquireConnection( READ ).thenCompose( Connection::forceRelease );
}

@Override
public CompletionStage<Void> close()
{
// todo: remove this try-catch when blocking API works on top of async
try
{
pool.close();
}
catch ( Exception e )
{
throw new RuntimeException( e );
}
return asyncPool.close();
return connectionPool.close();
}

public BoltServerAddress getAddress()
{
return address;
}

/**
* Acquires and releases a connection to verify connectivity so this connection provider fails fast. This is
* especially valuable when driver was created with incorrect credentials.
*/
private void verifyConnectivity()
{
acquireConnection( AccessMode.READ ).close();
}
}
109 changes: 41 additions & 68 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,26 @@
import java.net.URI;
import java.security.GeneralSecurityException;

import org.neo4j.driver.internal.async.AsyncConnectorImpl;
import org.neo4j.driver.internal.async.BoltServerAddress;
import org.neo4j.driver.internal.async.BootstrapFactory;
import org.neo4j.driver.internal.async.Futures;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPool;
import org.neo4j.driver.internal.async.pool.AsyncConnectionPoolImpl;
import org.neo4j.driver.internal.async.ChannelConnector;
import org.neo4j.driver.internal.async.ChannelConnectorImpl;
import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl;
import org.neo4j.driver.internal.async.pool.PoolSettings;
import org.neo4j.driver.internal.cluster.RoutingContext;
import org.neo4j.driver.internal.cluster.RoutingSettings;
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.net.SocketConnector;
import org.neo4j.driver.internal.net.pooling.PoolSettings;
import org.neo4j.driver.internal.net.pooling.SocketConnectionPool;
import org.neo4j.driver.internal.retry.ExponentialBackoffRetryLogic;
import org.neo4j.driver.internal.retry.RetryLogic;
import org.neo4j.driver.internal.retry.RetrySettings;
import org.neo4j.driver.internal.security.SecurityPlan;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.Connector;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Config;
Expand All @@ -72,27 +69,26 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
BoltServerAddress address = new BoltServerAddress( uri );
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );

Bootstrap bootstrap = createBootstrap();
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );

AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, bootstrap, config );

try
{
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
InternalDriver driver = createDriver( uri, address, connectionPool, config, newRoutingSettings,
eventExecutorGroup, securityPlan, retryLogic );
Futures.getBlocking( driver.verifyConnectivity() );
return driver;
}
catch ( Throwable driverError )
{
// we need to close the connection pool if driver creation threw exception
try
{
connectionPool.close();
Futures.getBlocking( asyncConnectionPool.close() );
Futures.getBlocking( connectionPool.close() );
}
catch ( Throwable closeError )
{
Expand All @@ -102,32 +98,38 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
}
}

private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan,
Bootstrap bootstrap, Config config )
{
Clock clock = createClock();
ConnectionSettings settings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
AsyncConnectorImpl connector = new AsyncConnectorImpl( settings, securityPlan, config.logging(), clock );
ChannelConnector connector = createConnector( settings, securityPlan, config, clock );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(),
config.connectionAcquisitionTimeoutMillis() );
return new AsyncConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
return new ConnectionPoolImpl( connector, bootstrap, poolSettings, config.logging(), clock );
}

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
protected ChannelConnector createConnector( ConnectionSettings settings, SecurityPlan securityPlan,
Config config, Clock clock )
{
return new ChannelConnectorImpl( settings, securityPlan, config.logging(), clock );
}

private InternalDriver createDriver( URI uri, BoltServerAddress address,
ConnectionPool connectionPool, Config config, RoutingSettings routingSettings,
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
{
case BOLT_URI_SCHEME:
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
return createDirectDriver( address, config, securityPlan, retryLogic, connectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
securityPlan, retryLogic, eventExecutorGroup );
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic,
eventExecutorGroup );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -138,11 +140,11 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
* <p>
* <b>This method is protected only for testing</b>
*/
protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, AsyncConnectionPool asyncConnectionPool )
protected InternalDriver createDirectDriver( BoltServerAddress address, Config config,
SecurityPlan securityPlan, RetryLogic retryLogic, ConnectionPool connectionPool )
{
ConnectionProvider connectionProvider =
new DirectConnectionProvider( address, connectionPool, asyncConnectionPool );
new DirectConnectionProvider( address, connectionPool );
SessionFactory sessionFactory =
createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
Expand All @@ -153,16 +155,16 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <p>
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
SecurityPlan securityPlan, RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup )
protected InternalDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
EventExecutorGroup eventExecutorGroup )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
eventExecutorGroup, config, routingSettings );
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, eventExecutorGroup,
config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
Expand All @@ -174,7 +176,7 @@ protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool
*/
protected InternalDriver createDriver( Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
{
return new InternalDriver( securityPlan, sessionFactory, config.logging() );
return new InternalDriver( securityPlan, sessionFactory );
}

/**
Expand All @@ -183,45 +185,27 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
* <b>This method is protected only for testing</b>
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup,
Config config, RoutingSettings routingSettings )
EventExecutorGroup eventExecutorGroup, Config config, RoutingSettings routingSettings )
{
LoadBalancingStrategy loadBalancingStrategy =
createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool );
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, eventExecutorGroup,
createClock(), config.logging(), loadBalancingStrategy );
LoadBalancingStrategy loadBalancingStrategy = createLoadBalancingStrategy( config, connectionPool );
return new LoadBalancer( address, routingSettings, connectionPool, eventExecutorGroup, createClock(),
config.logging(), loadBalancingStrategy );
}

private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool )
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config,
ConnectionPool connectionPool )
{
switch ( config.loadBalancingStrategy() )
{
case ROUND_ROBIN:
return new RoundRobinLoadBalancingStrategy( config.logging() );
case LEAST_CONNECTED:
return new LeastConnectedLoadBalancingStrategy( connectionPool, asyncConnectionPool, config.logging() );
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
default:
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
}
}

/**
* Creates new {@link ConnectionPool}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
{
ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest(), config.maxConnectionLifetimeMillis(),
config.maxConnectionPoolSize(), config.connectionAcquisitionTimeoutMillis() );
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );

return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
}

/**
* Creates new {@link Clock}.
* <p>
Expand All @@ -232,17 +216,6 @@ protected Clock createClock()
return Clock.SYSTEM;
}

/**
* Creates new {@link Connector}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected Connector createConnector( final ConnectionSettings connectionSettings, SecurityPlan securityPlan,
Logging logging )
{
return new SocketConnector( connectionSettings, securityPlan, logging );
}

/**
* Creates new {@link SessionFactory}.
* <p>
Expand Down
Loading