Skip to content

Initial support for async in routing driver #409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
SecurityPlan securityPlan = createSecurityPlan( address, config );
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );

Bootstrap bootstrap = BootstrapFactory.newBootstrap();
Bootstrap bootstrap = createBootstrap();
EventLoopGroup eventLoopGroup = bootstrap.config().group();
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );

Expand All @@ -85,8 +85,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r

try
{
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
asyncConnectionPool );
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
securityPlan, retryLogic );
}
catch ( Throwable driverError )
{
Expand Down Expand Up @@ -121,8 +121,8 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
}

private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
AsyncConnectionPool asyncConnectionPool )
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
SecurityPlan securityPlan, RetryLogic retryLogic )
{
String scheme = uri.getScheme().toLowerCase();
switch ( scheme )
Expand All @@ -131,7 +131,8 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
assertNoRoutingContext( uri, routingSettings );
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
case BOLT_ROUTING_URI_SCHEME:
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
securityPlan, retryLogic );
default:
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
}
Expand All @@ -158,13 +159,15 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
* <b>This method is protected only for testing</b>
*/
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
SecurityPlan securityPlan, RetryLogic retryLogic )
{
if ( !securityPlan.isRoutingCompatible() )
{
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
}
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
config, routingSettings );
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
return createDriver( config, securityPlan, sessionFactory );
}
Expand All @@ -184,21 +187,22 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
* <p>
* <b>This method is protected only for testing</b>
*/
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings )
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings )
{
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
createLoadBalancingStrategy( config, connectionPool ) );
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, createClock(),
config.logging(), createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool ) );
}

private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,
AsyncConnectionPool asyncConnectionPool )
{
switch ( config.loadBalancingStrategy() )
{
case ROUND_ROBIN:
return new RoundRobinLoadBalancingStrategy( config.logging() );
case LEAST_CONNECTED:
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
return new LeastConnectedLoadBalancingStrategy( connectionPool, asyncConnectionPool, config.logging() );
default:
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
}
Expand Down Expand Up @@ -253,7 +257,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
}

/**
* Creates new {@link RetryLogic >}.
* Creates new {@link RetryLogic}.
* <p>
* <b>This method is protected only for testing</b>
*/
Expand All @@ -263,6 +267,16 @@ protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGrou
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
}

/**
* Creates new {@link Bootstrap}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected Bootstrap createBootstrap()
{
return BootstrapFactory.newBootstrap();
}

private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import java.util.Map;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ResponseHandler;
import org.neo4j.driver.internal.util.ServerVersion;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.summary.ServerInfo;

public interface AsyncConnection
{
Expand All @@ -43,5 +44,7 @@ void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandle

CompletionStage<Void> forceRelease();

ServerInfo serverInfo();
BoltServerAddress serverAddress();

ServerVersion serverVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,42 @@

import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.util.ServerVersion;

import static io.netty.util.AttributeKey.newInstance;

public final class ChannelAttributes
{
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "address" );
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "serverAddress" );
private static final AttributeKey<ServerVersion> SERVER_VERSION = newInstance( "serverVersion" );
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
private static final AttributeKey<String> SERVER_VERSION = newInstance( "serverVersion" );

private ChannelAttributes()
{
}

public static BoltServerAddress address( Channel channel )
public static BoltServerAddress serverAddress( Channel channel )
{
return get( channel, ADDRESS );
}

public static void setAddress( Channel channel, BoltServerAddress address )
public static void setServerAddress( Channel channel, BoltServerAddress address )
{
setOnce( channel, ADDRESS, address );
}

public static ServerVersion serverVersion( Channel channel )
{
return get( channel, SERVER_VERSION );
}

public static void setServerVersion( Channel channel, ServerVersion version )
{
setOnce( channel, SERVER_VERSION, version );
}

public static long creationTimestamp( Channel channel )
{
return get( channel, CREATION_TIMESTAMP );
Expand Down Expand Up @@ -78,16 +89,6 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
}

public static String serverVersion( Channel channel )
{
return get( channel, SERVER_VERSION );
}

public static void setServerVersion( Channel channel, String serverVersion )
{
setOnce( channel, SERVER_VERSION, serverVersion );
}

private static <T> T get( Channel channel, AttributeKey<T> key )
{
return channel.attr( key ).get();
Expand Down
Loading