Skip to content

Commit da9c751

Browse files
author
Zhen Li
committed
Adding multidb feature detection Driver#supportsMultiDb
For a single driver, we will open a connection with the init server and then check if the server version is 4.0+ and talking with driver using bolt v4. For a routing driver, we will go through all addresses that are resolved from the init seed server. Then we will check with the first successfully connected server if the server version is 4.0+ and talking with driver using protocol bolt v4.
1 parent 5ad532b commit da9c751

18 files changed

+306
-27
lines changed

driver/src/main/java/org/neo4j/driver/Driver.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,17 @@ public interface Driver extends AutoCloseable
194194
* @return a {@link CompletionStage completion stage} that represents the asynchronous verification.
195195
*/
196196
CompletionStage<Void> verifyConnectivityAsync();
197+
198+
/**
199+
* Returns true if the server or cluster the driver connects to supports multi-databases, otherwise false.
200+
* @return true if the server or cluster the driver connects to supports multi-databases, otherwise false.
201+
*/
202+
boolean supportsMultiDb();
203+
204+
/**
205+
* Asynchronous check if the server or cluster the driver connects to supports multi-databases.
206+
* @return a {@link CompletionStage completion stage} that returns true if the server or cluster
207+
* the driver connects to supports multi-databases, otherwise false.
208+
*/
209+
CompletionStage<Boolean> supportsMultiDbAsync();
197210
}

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.neo4j.driver.internal.spi.ConnectionPool;
2727
import org.neo4j.driver.internal.spi.ConnectionProvider;
2828

29-
import static org.neo4j.driver.internal.async.ImmutableConnectionContext.simple;
29+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
3030

3131
/**
3232
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
@@ -46,15 +46,13 @@ public class DirectConnectionProvider implements ConnectionProvider
4646
@Override
4747
public CompletionStage<Connection> acquireConnection( ConnectionContext context )
4848
{
49-
return connectionPool.acquire( address ).thenApply( connection -> new DirectConnection( connection, context.databaseName(), context.mode() ) );
49+
return acquireConnection().thenApply( connection -> new DirectConnection( connection, context.databaseName(), context.mode() ) );
5050
}
5151

5252
@Override
5353
public CompletionStage<Void> verifyConnectivity()
5454
{
55-
// We verify the connection by establishing a connection with the remote server specified by the address.
56-
// Connection context will be ignored as no query is run in this connection and the connection is released immediately.
57-
return acquireConnection( simple() ).thenCompose( Connection::release );
55+
return acquireConnection().thenCompose( Connection::release );
5856
}
5957

6058
@Override
@@ -63,8 +61,26 @@ public CompletionStage<Void> close()
6361
return connectionPool.close();
6462
}
6563

64+
@Override
65+
public CompletionStage<Boolean> supportsMultiDbAsync()
66+
{
67+
return acquireConnection().thenCompose( conn -> {
68+
boolean supportsMultiDatabase = supportsMultiDatabase( conn );
69+
return conn.release().thenApply( ignored -> supportsMultiDatabase );
70+
} );
71+
}
72+
6673
public BoltServerAddress getAddress()
6774
{
6875
return address;
6976
}
77+
78+
/**
79+
* Used only for grabbing a connection with the server after hello message.
80+
* This connection cannot be directly used for running any queries as it is missing necessary connection context
81+
*/
82+
private CompletionStage<Connection> acquireConnection()
83+
{
84+
return connectionPool.acquire( address );
85+
}
7086
}

driver/src/main/java/org/neo4j/driver/internal/InternalDriver.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,18 @@ public CompletionStage<Void> verifyConnectivityAsync()
141141
return sessionFactory.verifyConnectivity();
142142
}
143143

144+
@Override
145+
public boolean supportsMultiDb()
146+
{
147+
return Futures.blockingGet( supportsMultiDbAsync() );
148+
}
149+
150+
@Override
151+
public CompletionStage<Boolean> supportsMultiDbAsync()
152+
{
153+
return sessionFactory.supportsMultiDbAsync();
154+
}
155+
144156
@Override
145157
public void verifyConnectivity()
146158
{

driver/src/main/java/org/neo4j/driver/internal/SessionFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,6 @@ public interface SessionFactory
3030
CompletionStage<Void> verifyConnectivity();
3131

3232
CompletionStage<Void> close();
33+
34+
CompletionStage<Boolean> supportsMultiDbAsync();
3335
}

driver/src/main/java/org/neo4j/driver/internal/SessionFactoryImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ public CompletionStage<Void> close()
7979
return connectionProvider.close();
8080
}
8181

82+
@Override
83+
public CompletionStage<Boolean> supportsMultiDbAsync()
84+
{
85+
return connectionProvider.supportsMultiDbAsync();
86+
}
87+
8288
/**
8389
* Get the underlying connection provider.
8490
* <p>

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.List;
2122
import java.util.concurrent.CompletionStage;
2223

2324
import org.neo4j.driver.Bookmark;
25+
import org.neo4j.driver.internal.BoltServerAddress;
2426
import org.neo4j.driver.internal.spi.ConnectionPool;
2527

2628
public interface Rediscovery
2729
{
2830
CompletionStage<ClusterComposition> lookupClusterComposition( RoutingTable routingTable, ConnectionPool connectionPool, Bookmark bookmark );
31+
32+
List<BoltServerAddress> resolve();
2933
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private CompletionStage<ClusterComposition> lookupOnInitialRouter( RoutingTable
198198
List<BoltServerAddress> addresses;
199199
try
200200
{
201-
addresses = resolve( initialRouter );
201+
addresses = resolve();
202202
}
203203
catch ( Throwable error )
204204
{
@@ -256,9 +256,10 @@ private ClusterComposition handleRoutingProcedureError( Throwable error, Routing
256256
return null;
257257
}
258258

259-
private List<BoltServerAddress> resolve( BoltServerAddress address )
259+
@Override
260+
public List<BoltServerAddress> resolve()
260261
{
261-
return resolver.resolve( address )
262+
return resolver.resolve( initialRouter )
262263
.stream()
263264
.flatMap( resolved -> resolveAll( BoltServerAddress.from( resolved ) ) )
264265
.collect( toList() ); // collect to list to preserve the order

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
import java.util.concurrent.CompletionStage;
2424

2525
import org.neo4j.driver.Bookmark;
26-
import org.neo4j.driver.Record;
2726
import org.neo4j.driver.Query;
27+
import org.neo4j.driver.Record;
2828
import org.neo4j.driver.exceptions.ProtocolException;
2929
import org.neo4j.driver.exceptions.value.ValueException;
3030
import org.neo4j.driver.internal.DatabaseName;
3131
import org.neo4j.driver.internal.spi.Connection;
3232
import org.neo4j.driver.internal.util.Clock;
33-
import org.neo4j.driver.internal.util.ServerVersion;
3433

3534
import static java.lang.String.format;
35+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
3636

3737
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
3838
{
@@ -59,7 +59,7 @@ public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext r
5959
public CompletionStage<ClusterComposition> getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark )
6060
{
6161
RoutingProcedureRunner runner;
62-
if ( connection.serverVersion().greaterThanOrEqual( ServerVersion.v4_0_0 ) )
62+
if ( supportsMultiDatabase( connection ) )
6363
{
6464
runner = multiDatabaseRoutingProcedureRunner;
6565
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import io.netty.util.concurrent.EventExecutorGroup;
2222

23+
import java.util.List;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.CompletionStage;
26+
import java.util.function.Supplier;
2527

2628
import org.neo4j.driver.AccessMode;
2729
import org.neo4j.driver.Logger;
@@ -49,6 +51,10 @@
4951

5052
import static java.lang.String.format;
5153
import static org.neo4j.driver.internal.async.ImmutableConnectionContext.simple;
54+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
55+
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
56+
import static org.neo4j.driver.internal.util.Futures.failedFuture;
57+
import static org.neo4j.driver.internal.util.Futures.onErrorContinue;
5258

5359
public class LoadBalancer implements ConnectionProvider
5460
{
@@ -58,20 +64,29 @@ public class LoadBalancer implements ConnectionProvider
5864
private final LoadBalancingStrategy loadBalancingStrategy;
5965
private final EventExecutorGroup eventExecutorGroup;
6066
private final Logger log;
67+
private final Rediscovery rediscovery;
6168

6269
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connectionPool,
6370
EventExecutorGroup eventExecutorGroup, Clock clock, Logging logging,
6471
LoadBalancingStrategy loadBalancingStrategy, ServerAddressResolver resolver )
6572
{
66-
this( connectionPool, createRoutingTables( connectionPool, eventExecutorGroup, initialRouter, resolver, settings, clock, logging ),
67-
loadBalancerLogger( logging ), loadBalancingStrategy, eventExecutorGroup );
73+
this( connectionPool, createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, logging ), settings, loadBalancingStrategy,
74+
eventExecutorGroup, clock, loadBalancerLogger( logging ) );
6875
}
6976

70-
LoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Logger log, LoadBalancingStrategy loadBalancingStrategy,
71-
EventExecutorGroup eventExecutorGroup )
77+
private LoadBalancer( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, LoadBalancingStrategy loadBalancingStrategy,
78+
EventExecutorGroup eventExecutorGroup, Clock clock, Logger log )
79+
{
80+
this( connectionPool, createRoutingTables( connectionPool, rediscovery, settings, clock, log ), rediscovery, loadBalancingStrategy, eventExecutorGroup,
81+
log );
82+
}
83+
84+
LoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Rediscovery rediscovery, LoadBalancingStrategy loadBalancingStrategy,
85+
EventExecutorGroup eventExecutorGroup, Logger log )
7286
{
7387
this.connectionPool = connectionPool;
7488
this.routingTables = routingTables;
89+
this.rediscovery = rediscovery;
7590
this.loadBalancingStrategy = loadBalancingStrategy;
7691
this.eventExecutorGroup = eventExecutorGroup;
7792
this.log = log;
@@ -109,6 +124,38 @@ public CompletionStage<Void> close()
109124
return connectionPool.close();
110125
}
111126

127+
@Override
128+
public CompletionStage<Boolean> supportsMultiDbAsync()
129+
{
130+
List<BoltServerAddress> addresses;
131+
132+
try
133+
{
134+
addresses = rediscovery.resolve();
135+
}
136+
catch ( Throwable error )
137+
{
138+
return failedFuture( error );
139+
}
140+
141+
CompletableFuture<Boolean> result = completedWithNull();
142+
Throwable baseError = new ServiceUnavailableException( "Failed to perform multi-databases feature detection with the following servers: " + addresses );
143+
144+
for ( BoltServerAddress address : addresses )
145+
{
146+
result = onErrorContinue( result, baseError, () -> supportsMultiDbAsync( address ) );
147+
}
148+
return onErrorContinue( result, baseError, (Supplier<CompletableFuture<Boolean>>) () -> failedFuture( baseError ) );
149+
}
150+
151+
private CompletionStage<Boolean> supportsMultiDbAsync( BoltServerAddress address )
152+
{
153+
return connectionPool.acquire( address ).thenCompose( conn -> {
154+
boolean supportsMultiDatabase = supportsMultiDatabase( conn );
155+
return conn.release().thenApply( ignored -> supportsMultiDatabase );
156+
} );
157+
}
158+
112159
private CompletionStage<Connection> acquire( AccessMode mode, RoutingTable routingTable )
113160
{
114161
AddressSet addresses = addressSet( mode, routingTable );
@@ -181,17 +228,16 @@ private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers )
181228
}
182229
}
183230

184-
private static RoutingTableRegistry createRoutingTables( ConnectionPool connectionPool, EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter,
185-
ServerAddressResolver resolver, RoutingSettings settings, Clock clock, Logging logging )
231+
private static RoutingTableRegistry createRoutingTables( ConnectionPool connectionPool, Rediscovery rediscovery, RoutingSettings settings, Clock clock,
232+
Logger log )
186233
{
187-
Logger log = loadBalancerLogger( logging );
188-
Rediscovery rediscovery = createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, log );
189234
return new RoutingTableRegistryImpl( connectionPool, rediscovery, clock, log, settings.routingTablePurgeDelayMs() );
190235
}
191236

192237
private static Rediscovery createRediscovery( EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter, ServerAddressResolver resolver,
193-
RoutingSettings settings, Clock clock, Logger log )
238+
RoutingSettings settings, Clock clock, Logging logging )
194239
{
240+
Logger log = loadBalancerLogger( logging );
195241
ClusterCompositionProvider clusterCompositionProvider = new RoutingProcedureClusterCompositionProvider( clock, settings.routingContext() );
196242
return new RediscoveryImpl( initialRouter, settings, clusterCompositionProvider, eventExecutorGroup, resolver, log );
197243
}

driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import org.neo4j.driver.exceptions.ClientException;
2222
import org.neo4j.driver.internal.DatabaseName;
23+
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.util.ServerVersion;
2325

2426
public final class MultiDatabaseUtil
2527
{
@@ -31,4 +33,9 @@ public static void assertEmptyDatabaseName( DatabaseName databaseName, int boltV
3133
"Database name: '%s'", boltVersion, databaseName.description() ) );
3234
}
3335
}
36+
37+
public static boolean supportsMultiDatabase( Connection connection )
38+
{
39+
return connection.serverVersion().greaterThanOrEqual( ServerVersion.v4_0_0 ) && connection.protocol().version() >= 4;
40+
}
3441
}

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ public interface ConnectionProvider
3636
CompletionStage<Void> verifyConnectivity();
3737

3838
CompletionStage<Void> close();
39+
40+
CompletionStage<Boolean> supportsMultiDbAsync();
3941
}

driver/src/main/java/org/neo4j/driver/internal/util/Futures.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.Future;
2626
import java.util.function.BiConsumer;
2727
import java.util.function.BiFunction;
28+
import java.util.function.Supplier;
2829

2930
import org.neo4j.driver.internal.async.connection.EventLoopGroupFactory;
3031

@@ -220,6 +221,35 @@ else if ( error2 != null )
220221
}
221222
}
222223

224+
/**
225+
* Given a future, if the future completes successfully then return a new completed future with the completed value.
226+
* Otherwise if the future completes with an error, then this method first saves the error in the error recorder, and then continues with the onErrorAction.
227+
* @param future the future.
228+
* @param errorRecorder saves error if the given future completes with an error.
229+
* @param onErrorAction continues the future with this action if the future completes with an error.
230+
* @param <T> type
231+
* @return a new completed future with the same completed value if the given future completes successfully, otherwise continues with the onErrorAction.
232+
*/
233+
@SuppressWarnings( "ThrowableNotThrown" )
234+
public static <T> CompletableFuture<T> onErrorContinue( CompletableFuture<T> future, Throwable errorRecorder,
235+
Supplier<? extends CompletionStage<T>> onErrorAction )
236+
{
237+
return future.exceptionally( error -> {
238+
Futures.combineErrors( errorRecorder, error );
239+
return null;
240+
} ).thenCompose( value -> {
241+
if ( value != null )
242+
{
243+
return completedFuture( value );
244+
}
245+
else
246+
{
247+
return onErrorAction.get();
248+
}
249+
} );
250+
}
251+
252+
223253
private static void safeRun( Runnable runnable )
224254
{
225255
try

driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,34 @@ void shouldRevertToInitialRouterIfKnownRouterThrowsProtocolErrors() throws Excep
11551155
}
11561156
}
11571157

1158+
@Test
1159+
void shouldServerWithBoltV4SupportMultiDb() throws Throwable
1160+
{
1161+
StubServer server = StubServer.start( "support_multidb_v4.script", 9001 );
1162+
try ( Driver driver = GraphDatabase.driver( "neo4j://localhost:9001", INSECURE_CONFIG ) )
1163+
{
1164+
assertTrue( driver.supportsMultiDb() );
1165+
}
1166+
finally
1167+
{
1168+
assertEquals( 0, server.exitStatus() );
1169+
}
1170+
}
1171+
1172+
@Test
1173+
void shouldServerWithBoltV3NotSupportMultiDb() throws Throwable
1174+
{
1175+
StubServer server = StubServer.start( "support_multidb_v3.script", 9001 );
1176+
try ( Driver driver = GraphDatabase.driver( "neo4j://localhost:9001", INSECURE_CONFIG ) )
1177+
{
1178+
assertFalse( driver.supportsMultiDb() );
1179+
}
1180+
finally
1181+
{
1182+
assertEquals( 0, server.exitStatus() );
1183+
}
1184+
}
1185+
11581186
private static Driver newDriverWithSleeplessClock( String uriString, Config config )
11591187
{
11601188
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );

0 commit comments

Comments
 (0)