Skip to content

Commit 9c9e61d

Browse files
authored
Merge pull request #411 from lutovich/1.5-async-routing-improvements
Async routing improvements
2 parents 45c76a8 + ca640ae commit 9c9e61d

File tree

47 files changed

+2411
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2411
-257
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal;
2020

2121
import io.netty.bootstrap.Bootstrap;
22-
import io.netty.channel.EventLoopGroup;
2322
import io.netty.util.concurrent.EventExecutorGroup;
2423

2524
import java.io.IOException;
@@ -76,17 +75,17 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7675
SecurityPlan securityPlan = createSecurityPlan( address, config );
7776
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
7877

79-
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
80-
EventLoopGroup eventLoopGroup = bootstrap.config().group();
81-
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
78+
Bootstrap bootstrap = createBootstrap();
79+
EventExecutorGroup eventExecutorGroup = bootstrap.config().group();
80+
RetryLogic retryLogic = createRetryLogic( retrySettings, eventExecutorGroup, config.logging() );
8281

8382
AsyncConnectionPool asyncConnectionPool = createAsyncConnectionPool( authToken, securityPlan, bootstrap,
8483
config );
8584

8685
try
8786
{
88-
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
89-
asyncConnectionPool );
87+
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
88+
eventExecutorGroup, securityPlan, retryLogic );
9089
}
9190
catch ( Throwable driverError )
9291
{
@@ -121,8 +120,8 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
121120
}
122121

123122
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
124-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
125-
AsyncConnectionPool asyncConnectionPool )
123+
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
124+
EventExecutorGroup eventExecutorGroup, SecurityPlan securityPlan, RetryLogic retryLogic )
126125
{
127126
String scheme = uri.getScheme().toLowerCase();
128127
switch ( scheme )
@@ -131,7 +130,8 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
131130
assertNoRoutingContext( uri, routingSettings );
132131
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
133132
case BOLT_ROUTING_URI_SCHEME:
134-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
133+
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
134+
securityPlan, retryLogic, eventExecutorGroup );
135135
default:
136136
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
137137
}
@@ -158,13 +158,15 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
158158
* <b>This method is protected only for testing</b>
159159
*/
160160
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
161-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
161+
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
162+
SecurityPlan securityPlan, RetryLogic retryLogic, EventExecutorGroup eventExecutorGroup )
162163
{
163164
if ( !securityPlan.isRoutingCompatible() )
164165
{
165166
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
166167
}
167-
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
168+
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
169+
eventExecutorGroup, config, routingSettings );
168170
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
169171
return createDriver( config, securityPlan, sessionFactory );
170172
}
@@ -184,21 +186,25 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
184186
* <p>
185187
* <b>This method is protected only for testing</b>
186188
*/
187-
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
188-
RoutingSettings routingSettings )
189+
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
190+
AsyncConnectionPool asyncConnectionPool, EventExecutorGroup eventExecutorGroup,
191+
Config config, RoutingSettings routingSettings )
189192
{
190-
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
191-
createLoadBalancingStrategy( config, connectionPool ) );
193+
LoadBalancingStrategy loadBalancingStrategy =
194+
createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool );
195+
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, eventExecutorGroup,
196+
createClock(), config.logging(), loadBalancingStrategy );
192197
}
193198

194-
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
199+
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,
200+
AsyncConnectionPool asyncConnectionPool )
195201
{
196202
switch ( config.loadBalancingStrategy() )
197203
{
198204
case ROUND_ROBIN:
199205
return new RoundRobinLoadBalancingStrategy( config.logging() );
200206
case LEAST_CONNECTED:
201-
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
207+
return new LeastConnectedLoadBalancingStrategy( connectionPool, asyncConnectionPool, config.logging() );
202208
default:
203209
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
204210
}
@@ -253,7 +259,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
253259
}
254260

255261
/**
256-
* Creates new {@link RetryLogic >}.
262+
* Creates new {@link RetryLogic}.
257263
* <p>
258264
* <b>This method is protected only for testing</b>
259265
*/
@@ -263,6 +269,16 @@ protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGrou
263269
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
264270
}
265271

272+
/**
273+
* Creates new {@link Bootstrap}.
274+
* <p>
275+
* <b>This method is protected only for testing</b>
276+
*/
277+
protected Bootstrap createBootstrap()
278+
{
279+
return BootstrapFactory.newBootstrap();
280+
}
281+
266282
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
267283
{
268284
try

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.util.Map;
2222
import java.util.concurrent.CompletionStage;
2323

24+
import org.neo4j.driver.internal.net.BoltServerAddress;
2425
import org.neo4j.driver.internal.spi.ResponseHandler;
26+
import org.neo4j.driver.internal.util.ServerVersion;
2527
import org.neo4j.driver.v1.Value;
26-
import org.neo4j.driver.v1.summary.ServerInfo;
2728

2829
public interface AsyncConnection
2930
{
@@ -43,5 +44,7 @@ void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandle
4344

4445
CompletionStage<Void> forceRelease();
4546

46-
ServerInfo serverInfo();
47+
BoltServerAddress serverAddress();
48+
49+
ServerVersion serverVersion();
4750
}

driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,42 @@
2323

2424
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2525
import org.neo4j.driver.internal.net.BoltServerAddress;
26+
import org.neo4j.driver.internal.util.ServerVersion;
2627

2728
import static io.netty.util.AttributeKey.newInstance;
2829

2930
public final class ChannelAttributes
3031
{
31-
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "address" );
32+
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "serverAddress" );
33+
private static final AttributeKey<ServerVersion> SERVER_VERSION = newInstance( "serverVersion" );
3234
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
3335
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
3436
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
35-
private static final AttributeKey<String> SERVER_VERSION = newInstance( "serverVersion" );
3637

3738
private ChannelAttributes()
3839
{
3940
}
4041

41-
public static BoltServerAddress address( Channel channel )
42+
public static BoltServerAddress serverAddress( Channel channel )
4243
{
4344
return get( channel, ADDRESS );
4445
}
4546

46-
public static void setAddress( Channel channel, BoltServerAddress address )
47+
public static void setServerAddress( Channel channel, BoltServerAddress address )
4748
{
4849
setOnce( channel, ADDRESS, address );
4950
}
5051

52+
public static ServerVersion serverVersion( Channel channel )
53+
{
54+
return get( channel, SERVER_VERSION );
55+
}
56+
57+
public static void setServerVersion( Channel channel, ServerVersion version )
58+
{
59+
setOnce( channel, SERVER_VERSION, version );
60+
}
61+
5162
public static long creationTimestamp( Channel channel )
5263
{
5364
return get( channel, CREATION_TIMESTAMP );
@@ -78,16 +89,6 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
7889
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
7990
}
8091

81-
public static String serverVersion( Channel channel )
82-
{
83-
return get( channel, SERVER_VERSION );
84-
}
85-
86-
public static void setServerVersion( Channel channel, String serverVersion )
87-
{
88-
setOnce( channel, SERVER_VERSION, serverVersion );
89-
}
90-
9192
private static <T> T get( Channel channel, AttributeKey<T> key )
9293
{
9394
return channel.attr( key ).get();

0 commit comments

Comments
 (0)