Skip to content

Commit 922437f

Browse files
author
Zhen Li
committed
Use the supportMultiDb feature detection result, to choose the database to connect.
1 parent 19a4bd7 commit 922437f

File tree

4 files changed

+102
-15
lines changed

4 files changed

+102
-15
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ImmutableConnectionContext.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@
2424
import org.neo4j.driver.internal.spi.Connection;
2525

2626
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
27+
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;
2728
import static org.neo4j.driver.internal.InternalBookmark.empty;
2829

2930
/**
3031
* A {@link Connection} shall fulfil this {@link ImmutableConnectionContext} when acquired from a connection provider.
3132
*/
3233
public class ImmutableConnectionContext implements ConnectionContext
3334
{
34-
private static final ConnectionContext SIMPLE = new ImmutableConnectionContext( defaultDatabase(), empty(), AccessMode.READ );
35+
private static final ConnectionContext SINGLE_DB_CONTEXT = new ImmutableConnectionContext( defaultDatabase(), empty(), AccessMode.READ );
36+
private static final ConnectionContext MULTI_DB_CONTEXT = new ImmutableConnectionContext( systemDatabase(), empty(), AccessMode.READ );
3537

3638
private final DatabaseName databaseName;
3739
private final AccessMode mode;
@@ -65,10 +67,10 @@ public Bookmark rediscoveryBookmark()
6567
/**
6668
* A simple context is used to test connectivity with a remote server/cluster.
6769
* As long as there is a read only service, the connection shall be established successfully.
68-
* This context should be applicable for both bolt v4 and bolt v3 routing table rediscovery.
70+
* Depending on whether multidb is supported or not, this method returns different context for routing table discovery.
6971
*/
70-
public static ConnectionContext simple()
72+
public static ConnectionContext simple( boolean supportsMultiDb )
7173
{
72-
return SIMPLE;
74+
return supportsMultiDb ? MULTI_DB_CONTEXT : SINGLE_DB_CONTEXT;
7375
}
7476
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,15 @@ public CompletionStage<Connection> acquireConnection( ConnectionContext context
103103
@Override
104104
public CompletionStage<Void> verifyConnectivity()
105105
{
106-
return routingTables.refreshRoutingTable( simple() ).handle( ( ignored, error ) -> {
106+
return this.supportsMultiDbAsync().thenCompose( supports -> routingTables.refreshRoutingTable( simple( supports ) ) ).handle( ( ignored, error ) -> {
107107
if ( error != null )
108108
{
109109
Throwable cause = Futures.completionExceptionCause( error );
110110
if ( cause instanceof ServiceUnavailableException )
111111
{
112112
throw Futures.asCompletionException( new ServiceUnavailableException(
113-
"Unable to connect to database, ensure the database is running and that there is a working network connection to it.", cause ) );
113+
"Unable to connect to database management service, ensure the database is running and that there is a working network connection to it.",
114+
cause ) );
114115
}
115116
throw Futures.asCompletionException( cause );
116117
}

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingTableHandlerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ void acquireShouldUpdateRoutingTableWhenKnownRoutingTableIsStale()
115115

116116
RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool );
117117

118-
assertNotNull( await( handler.refreshRoutingTable( simple() ) ) );
118+
assertNotNull( await( handler.refreshRoutingTable( simple( false ) ) ) );
119119

120120
verify( rediscovery ).lookupClusterComposition( eq ( routingTable ) , eq ( connectionPool ), any() );
121121
assertArrayEquals( new BoltServerAddress[]{reader1, reader2}, routingTable.readers().toArray() );
@@ -188,7 +188,7 @@ public void removeAged()
188188

189189
RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool, registry );
190190

191-
RoutingTable actual = await( handler.refreshRoutingTable( simple() ) );
191+
RoutingTable actual = await( handler.refreshRoutingTable( simple( false ) ) );
192192
assertEquals( routingTable, actual );
193193

194194
verify( connectionPool ).retainAll( new HashSet<>( asList( A, B, C ) ) );
@@ -208,7 +208,7 @@ void shouldRemoveRoutingTableHandlerIfFailedToLookup() throws Throwable
208208
// When
209209

210210
RoutingTableHandler handler = newRoutingTableHandler( routingTable, rediscovery, connectionPool, registry );
211-
assertThrows( RuntimeException.class, () -> await( handler.refreshRoutingTable( simple() ) ) );
211+
assertThrows( RuntimeException.class, () -> await( handler.refreshRoutingTable( simple( false ) ) ) );
212212

213213
// Then
214214
verify( registry ).remove( defaultDatabase() );

driver/src/test/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancerTest.java

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.LinkedHashSet;
3030
import java.util.Set;
3131
import java.util.concurrent.CompletableFuture;
32+
import java.util.function.Function;
3233

3334
import org.neo4j.driver.AccessMode;
3435
import org.neo4j.driver.exceptions.ServiceUnavailableException;
@@ -161,7 +162,7 @@ void shouldSelectLeastConnectedAddress()
161162
Set<BoltServerAddress> seenAddresses = new HashSet<>();
162163
for ( int i = 0; i < 10; i++ )
163164
{
164-
Connection connection = await( loadBalancer.acquireConnection( simple() ) );
165+
Connection connection = await( loadBalancer.acquireConnection( newBoltV4ConnectionContext() ) );
165166
seenAddresses.add( connection.serverAddress() );
166167
}
167168

@@ -185,7 +186,7 @@ void shouldRoundRobinWhenNoActiveConnections()
185186
Set<BoltServerAddress> seenAddresses = new HashSet<>();
186187
for ( int i = 0; i < 10; i++ )
187188
{
188-
Connection connection = await( loadBalancer.acquireConnection( simple() ) );
189+
Connection connection = await( loadBalancer.acquireConnection( newBoltV4ConnectionContext() ) );
189190
seenAddresses.add( connection.serverAddress() );
190191
}
191192

@@ -204,7 +205,7 @@ void shouldTryMultipleServersAfterRediscovery()
204205

205206
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTable );
206207

207-
Connection connection = await( loadBalancer.acquireConnection( simple() ) );
208+
Connection connection = await( loadBalancer.acquireConnection( newBoltV4ConnectionContext() ) );
208209

209210
assertNotNull( connection );
210211
assertEquals( B, connection.serverAddress() );
@@ -257,21 +258,93 @@ void shouldSuccessOnFirstSuccessfulServer() throws Throwable
257258
assertTrue( await( loadBalancer.supportsMultiDbAsync() ) );
258259
}
259260

261+
@Test
262+
void shouldThrowModifiedErrorWhenSupportMultiDbTestFails() throws Throwable
263+
{
264+
Set<BoltServerAddress> unavailableAddresses = asOrderedSet( A, B );
265+
ConnectionPool connectionPool = newConnectionPoolMockWithFailures( unavailableAddresses );
266+
267+
Rediscovery rediscovery = mock( Rediscovery.class );
268+
when( rediscovery.resolve() ).thenReturn( Arrays.asList( A, B ) );
269+
270+
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, rediscovery );
271+
272+
ServiceUnavailableException exception = assertThrows( ServiceUnavailableException.class, () -> await( loadBalancer.verifyConnectivity() ) );
273+
assertThat( exception.getMessage(), startsWith( "Unable to connect to database management service," ) );
274+
}
275+
276+
@Test
277+
void shouldThrowModifiedErrorWhenRefreshRoutingTableFails() throws Throwable
278+
{
279+
ConnectionPool connectionPool = newConnectionPoolMock();
280+
281+
Rediscovery rediscovery = mock( Rediscovery.class );
282+
when( rediscovery.resolve() ).thenReturn( Arrays.asList( A, B ) );
283+
284+
RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class );
285+
when( routingTables.refreshRoutingTable( any( ConnectionContext.class ) ) ).thenThrow( new ServiceUnavailableException( "boooo" ) );
286+
287+
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTables, rediscovery );
288+
289+
ServiceUnavailableException exception = assertThrows( ServiceUnavailableException.class, () -> await( loadBalancer.verifyConnectivity() ) );
290+
assertThat( exception.getMessage(), startsWith( "Unable to connect to database management service," ) );
291+
verify( routingTables ).refreshRoutingTable( any( ConnectionContext.class ) );
292+
}
293+
294+
@Test
295+
void shouldThrowOriginalErrorWhenRefreshRoutingTableFails() throws Throwable
296+
{
297+
ConnectionPool connectionPool = newConnectionPoolMock();
298+
299+
Rediscovery rediscovery = mock( Rediscovery.class );
300+
when( rediscovery.resolve() ).thenReturn( Arrays.asList( A, B ) );
301+
302+
RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class );
303+
when( routingTables.refreshRoutingTable( any( ConnectionContext.class ) ) ).thenThrow( new SecurityException( "boo" ) );
304+
305+
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTables, rediscovery );
306+
307+
SecurityException exception = assertThrows( SecurityException.class, () -> await( loadBalancer.verifyConnectivity() ) );
308+
assertThat( exception.getMessage(), startsWith( "boo" ) );
309+
verify( routingTables ).refreshRoutingTable( any( ConnectionContext.class ) );
310+
}
311+
312+
@Test
313+
void shouldReturnSuccessVerifyConnectivity() throws Throwable
314+
{
315+
ConnectionPool connectionPool = newConnectionPoolMock();
316+
317+
Rediscovery rediscovery = mock( Rediscovery.class );
318+
when( rediscovery.resolve() ).thenReturn( Arrays.asList( A, B ) );
319+
320+
RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class );
321+
when( routingTables.refreshRoutingTable( any( ConnectionContext.class ) ) ).thenReturn( Futures.completedWithNull() );
322+
323+
LoadBalancer loadBalancer = newLoadBalancer( connectionPool, routingTables, rediscovery );
324+
325+
await( loadBalancer.verifyConnectivity() );
326+
verify( routingTables ).refreshRoutingTable( any( ConnectionContext.class ) );
327+
}
328+
260329
private static ConnectionPool newConnectionPoolMock()
261330
{
262331
return newConnectionPoolMockWithFailures( emptySet() );
263332
}
264333

265-
private static ConnectionPool newConnectionPoolMockWithFailures(
266-
Set<BoltServerAddress> unavailableAddresses )
334+
private static ConnectionPool newConnectionPoolMockWithFailures( Set<BoltServerAddress> unavailableAddresses )
335+
{
336+
return newConnectionPoolMockWithFailures( unavailableAddresses, address -> new ServiceUnavailableException( address + " is unavailable!" ) );
337+
}
338+
339+
private static ConnectionPool newConnectionPoolMockWithFailures( Set<BoltServerAddress> unavailableAddresses, Function<BoltServerAddress, Throwable> errorAction )
267340
{
268341
ConnectionPool pool = mock( ConnectionPool.class );
269342
when( pool.acquire( any( BoltServerAddress.class ) ) ).then( invocation ->
270343
{
271344
BoltServerAddress requestedAddress = invocation.getArgument( 0 );
272345
if ( unavailableAddresses.contains( requestedAddress ) )
273346
{
274-
return Futures.failedFuture( new ServiceUnavailableException( requestedAddress + " is unavailable!" ) );
347+
return Futures.failedFuture( errorAction.apply( requestedAddress ) );
275348
}
276349

277350
return completedFuture( newBoltV4Connection( requestedAddress ) );
@@ -289,6 +362,11 @@ private static Connection newBoltV4Connection( BoltServerAddress address )
289362
return connection;
290363
}
291364

365+
private static ConnectionContext newBoltV4ConnectionContext()
366+
{
367+
return simple( true );
368+
}
369+
292370
private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, RoutingTable routingTable )
293371
{
294372
// Used only in testing
@@ -305,6 +383,12 @@ private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, Redi
305383
{
306384
// Used only in testing
307385
RoutingTableRegistry routingTables = mock( RoutingTableRegistry.class );
386+
return newLoadBalancer( connectionPool, routingTables, rediscovery );
387+
}
388+
389+
private static LoadBalancer newLoadBalancer( ConnectionPool connectionPool, RoutingTableRegistry routingTables, Rediscovery rediscovery )
390+
{
391+
// Used only in testing
308392
return new LoadBalancer( connectionPool, routingTables, rediscovery, new LeastConnectedLoadBalancingStrategy( connectionPool, DEV_NULL_LOGGING ),
309393
GlobalEventExecutor.INSTANCE, DEV_NULL_LOGGER );
310394
}

0 commit comments

Comments
 (0)