Skip to content

Commit c6ae295

Browse files
committed
Async support in LoadBalancer
This commit makes `LoadBalancer` able to serve async connections. Rediscovery procedure is also executed asynchronously. Connections returned by `LoadBalancer` have special error handling logic to remove failed instances from the routing table and purge their connections from the pool. Also made `AsyncConnection` expose server address and version instead of `ServerInfo`. Couple TODOs are introduced. They will be addressed in the subsequent commits.
1 parent b245275 commit c6ae295

33 files changed

+1332
-149
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7676
SecurityPlan securityPlan = createSecurityPlan( address, config );
7777
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
7878

79-
Bootstrap bootstrap = BootstrapFactory.newBootstrap();
79+
Bootstrap bootstrap = createBootstrap();
8080
EventLoopGroup eventLoopGroup = bootstrap.config().group();
8181
RetryLogic retryLogic = createRetryLogic( retrySettings, eventLoopGroup, config.logging() );
8282

@@ -85,8 +85,8 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
8585

8686
try
8787
{
88-
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic,
89-
asyncConnectionPool );
88+
return createDriver( uri, address, connectionPool, asyncConnectionPool, config, newRoutingSettings,
89+
securityPlan, retryLogic );
9090
}
9191
catch ( Throwable driverError )
9292
{
@@ -121,8 +121,8 @@ private AsyncConnectionPool createAsyncConnectionPool( AuthToken authToken, Secu
121121
}
122122

123123
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
124-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic,
125-
AsyncConnectionPool asyncConnectionPool )
124+
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
125+
SecurityPlan securityPlan, RetryLogic retryLogic )
126126
{
127127
String scheme = uri.getScheme().toLowerCase();
128128
switch ( scheme )
@@ -131,7 +131,8 @@ private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool
131131
assertNoRoutingContext( uri, routingSettings );
132132
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic, asyncConnectionPool );
133133
case BOLT_ROUTING_URI_SCHEME:
134-
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
134+
return createRoutingDriver( address, connectionPool, asyncConnectionPool, config, routingSettings,
135+
securityPlan, retryLogic );
135136
default:
136137
throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) );
137138
}
@@ -158,13 +159,15 @@ protected Driver createDirectDriver( BoltServerAddress address, ConnectionPool c
158159
* <b>This method is protected only for testing</b>
159160
*/
160161
protected Driver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
161-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
162+
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings,
163+
SecurityPlan securityPlan, RetryLogic retryLogic )
162164
{
163165
if ( !securityPlan.isRoutingCompatible() )
164166
{
165167
throw new IllegalArgumentException( "The chosen security plan is not compatible with a routing driver" );
166168
}
167-
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, config, routingSettings );
169+
ConnectionProvider connectionProvider = createLoadBalancer( address, connectionPool, asyncConnectionPool,
170+
config, routingSettings );
168171
SessionFactory sessionFactory = createSessionFactory( connectionProvider, retryLogic, config );
169172
return createDriver( config, securityPlan, sessionFactory );
170173
}
@@ -184,21 +187,22 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
184187
* <p>
185188
* <b>This method is protected only for testing</b>
186189
*/
187-
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
188-
RoutingSettings routingSettings )
190+
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool,
191+
AsyncConnectionPool asyncConnectionPool, Config config, RoutingSettings routingSettings )
189192
{
190-
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
191-
createLoadBalancingStrategy( config, connectionPool ) );
193+
return new LoadBalancer( address, routingSettings, connectionPool, asyncConnectionPool, createClock(),
194+
config.logging(), createLoadBalancingStrategy( config, connectionPool, asyncConnectionPool ) );
192195
}
193196

194-
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
197+
private static LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool,
198+
AsyncConnectionPool asyncConnectionPool )
195199
{
196200
switch ( config.loadBalancingStrategy() )
197201
{
198202
case ROUND_ROBIN:
199203
return new RoundRobinLoadBalancingStrategy( config.logging() );
200204
case LEAST_CONNECTED:
201-
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
205+
return new LeastConnectedLoadBalancingStrategy( connectionPool, asyncConnectionPool, config.logging() );
202206
default:
203207
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
204208
}
@@ -253,7 +257,7 @@ protected SessionFactory createSessionFactory( ConnectionProvider connectionProv
253257
}
254258

255259
/**
256-
* Creates new {@link RetryLogic >}.
260+
* Creates new {@link RetryLogic}.
257261
* <p>
258262
* <b>This method is protected only for testing</b>
259263
*/
@@ -263,6 +267,16 @@ protected RetryLogic createRetryLogic( RetrySettings settings, EventExecutorGrou
263267
return new ExponentialBackoffRetryLogic( settings, eventExecutorGroup, createClock(), logging );
264268
}
265269

270+
/**
271+
* Creates new {@link Bootstrap}.
272+
* <p>
273+
* <b>This method is protected only for testing</b>
274+
*/
275+
protected Bootstrap createBootstrap()
276+
{
277+
return BootstrapFactory.newBootstrap();
278+
}
279+
266280
private static SecurityPlan createSecurityPlan( BoltServerAddress address, Config config )
267281
{
268282
try

driver/src/main/java/org/neo4j/driver/internal/async/AsyncConnection.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
import java.util.Map;
2222
import java.util.concurrent.CompletionStage;
2323

24+
import org.neo4j.driver.internal.net.BoltServerAddress;
2425
import org.neo4j.driver.internal.spi.ResponseHandler;
26+
import org.neo4j.driver.internal.util.ServerVersion;
2527
import org.neo4j.driver.v1.Value;
26-
import org.neo4j.driver.v1.summary.ServerInfo;
2728

2829
public interface AsyncConnection
2930
{
@@ -43,5 +44,7 @@ void runAndFlush( String statement, Map<String,Value> parameters, ResponseHandle
4344

4445
CompletionStage<Void> forceRelease();
4546

46-
ServerInfo serverInfo();
47+
BoltServerAddress serverAddress();
48+
49+
ServerVersion serverVersion();
4750
}

driver/src/main/java/org/neo4j/driver/internal/async/ChannelAttributes.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,42 @@
2323

2424
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
2525
import org.neo4j.driver.internal.net.BoltServerAddress;
26+
import org.neo4j.driver.internal.util.ServerVersion;
2627

2728
import static io.netty.util.AttributeKey.newInstance;
2829

2930
public final class ChannelAttributes
3031
{
31-
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "address" );
32+
private static final AttributeKey<BoltServerAddress> ADDRESS = newInstance( "serverAddress" );
33+
private static final AttributeKey<ServerVersion> SERVER_VERSION = newInstance( "serverVersion" );
3234
private static final AttributeKey<Long> CREATION_TIMESTAMP = newInstance( "creationTimestamp" );
3335
private static final AttributeKey<Long> LAST_USED_TIMESTAMP = newInstance( "lastUsedTimestamp" );
3436
private static final AttributeKey<InboundMessageDispatcher> MESSAGE_DISPATCHER = newInstance( "messageDispatcher" );
35-
private static final AttributeKey<String> SERVER_VERSION = newInstance( "serverVersion" );
3637

3738
private ChannelAttributes()
3839
{
3940
}
4041

41-
public static BoltServerAddress address( Channel channel )
42+
public static BoltServerAddress serverAddress( Channel channel )
4243
{
4344
return get( channel, ADDRESS );
4445
}
4546

46-
public static void setAddress( Channel channel, BoltServerAddress address )
47+
public static void setServerAddress( Channel channel, BoltServerAddress address )
4748
{
4849
setOnce( channel, ADDRESS, address );
4950
}
5051

52+
public static ServerVersion serverVersion( Channel channel )
53+
{
54+
return get( channel, SERVER_VERSION );
55+
}
56+
57+
public static void setServerVersion( Channel channel, ServerVersion version )
58+
{
59+
setOnce( channel, SERVER_VERSION, version );
60+
}
61+
5162
public static long creationTimestamp( Channel channel )
5263
{
5364
return get( channel, CREATION_TIMESTAMP );
@@ -78,16 +89,6 @@ public static void setMessageDispatcher( Channel channel, InboundMessageDispatch
7889
setOnce( channel, MESSAGE_DISPATCHER, messageDispatcher );
7990
}
8091

81-
public static String serverVersion( Channel channel )
82-
{
83-
return get( channel, SERVER_VERSION );
84-
}
85-
86-
public static void setServerVersion( Channel channel, String serverVersion )
87-
{
88-
setOnce( channel, SERVER_VERSION, serverVersion );
89-
}
90-
9192
private static <T> T get( Channel channel, AttributeKey<T> key )
9293
{
9394
return channel.attr( key ).get();

0 commit comments

Comments
 (0)