Skip to content

Commit c8e0317

Browse files
author
Zhen Li
committed
For routing driver, by default, get the routing table for default-database.
As this makes the routing table map easier for server version lower than 4.0. Otherwise, if we default to get routing table for system_db, we need to detect if it is server version lower than 4.0. If so, we can only get routing table for default_db, and then we need to reset the key in routing table map to be default_db. TODO: Removing stale routing table from routing table map. Also missing any integration tests for 4.0 routing driver.
1 parent 94bfd5d commit c8e0317

21 files changed

+201
-219
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableFactory.java renamed to driver/src/main/java/org/neo4j/driver/exceptions/RoutingException.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,22 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.cluster;
19+
package org.neo4j.driver.exceptions;
2020

21-
public interface RoutingTableFactory
21+
/**
22+
* A routing error indicate a fatal problem to obtain routing tables such as the routing table for a specified database does not exist.
23+
* This exception should not be retried.
24+
* @since 2.0
25+
*/
26+
public class RoutingException extends Neo4jException
2227
{
23-
RoutingTable newInstance( String databaseName );
28+
public RoutingException( String message )
29+
{
30+
super( message );
31+
}
32+
33+
public RoutingException( String message, Throwable throwable )
34+
{
35+
super( message, throwable);
36+
}
2437
}

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.neo4j.driver.internal.spi.ConnectionProvider;
2828

2929
import static org.neo4j.driver.AccessMode.READ;
30-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
30+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3131

3232
/**
3333
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
@@ -53,8 +53,9 @@ public CompletionStage<Connection> acquireConnection( String databaseName, Acces
5353
@Override
5454
public CompletionStage<Void> verifyConnectivity()
5555
{
56-
// we verify the connection by establishing the connection to the default database
57-
return acquireConnection( SYSTEM_DB_NAME, READ ).thenCompose( Connection::release );
56+
// We verify the connection by establishing a connection with the remote server specified by the address.
57+
// Database name will be ignored as no query is run in this connection and the connection is released immediately.
58+
return acquireConnection( ABSENT_DB_NAME, READ ).thenCompose( Connection::release );
5859
}
5960

6061
@Override

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ public void retainAll( Set<BoltServerAddress> addressesToRetain )
127127
{
128128
// address is not present in updated routing table and has no active connections
129129
// it's now safe to terminate corresponding connection pool and forget about it
130-
131130
ChannelPool pool = pools.remove( address );
132131
if ( pool != null )
133132
{

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,20 @@
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.stream.Stream;
3232

33+
import org.neo4j.driver.Logger;
34+
import org.neo4j.driver.exceptions.SecurityException;
35+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3336
import org.neo4j.driver.internal.BoltServerAddress;
3437
import org.neo4j.driver.internal.spi.Connection;
3538
import org.neo4j.driver.internal.spi.ConnectionPool;
3639
import org.neo4j.driver.internal.util.Futures;
37-
import org.neo4j.driver.Logger;
38-
import org.neo4j.driver.exceptions.SecurityException;
39-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
4040
import org.neo4j.driver.net.ServerAddressResolver;
4141

4242
import static java.lang.String.format;
4343
import static java.util.Collections.emptySet;
4444
import static java.util.concurrent.CompletableFuture.completedFuture;
4545
import static java.util.stream.Collectors.toList;
46+
import static org.neo4j.driver.internal.util.ErrorUtil.isRoutingError;
4647
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4748
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4849

@@ -253,12 +254,11 @@ private CompletionStage<ClusterComposition> lookupOnRouter( BoltServerAddress ro
253254
private ClusterComposition handleRoutingProcedureError( Throwable error, RoutingTable routingTable,
254255
BoltServerAddress routerAddress )
255256
{
256-
if ( error instanceof SecurityException )
257+
if ( error instanceof SecurityException || isRoutingError( error ) )
257258
{
258-
// auth error happened, terminate the discovery procedure immediately
259+
// auth error or routing error happened, terminate the discovery procedure immediately
259260
throw new CompletionException( error );
260261
}
261-
else
262262
{
263263
// connection turned out to be broken
264264
logger.info( format( "Failed to connect to routing server '%s'.", routerAddress ), error );

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
2425

@@ -28,6 +29,7 @@
2829
import org.neo4j.driver.TransactionConfig;
2930
import org.neo4j.driver.async.StatementResultCursor;
3031
import org.neo4j.driver.exceptions.ClientException;
32+
import org.neo4j.driver.exceptions.RoutingException;
3133
import org.neo4j.driver.internal.BookmarksHolder;
3234
import org.neo4j.driver.internal.async.connection.DirectConnection;
3335
import org.neo4j.driver.internal.spi.Connection;
@@ -37,12 +39,10 @@
3739
import static org.neo4j.driver.Values.parameters;
3840
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3941
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
40-
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
4142
import static org.neo4j.driver.internal.util.ServerVersion.v4_0_0;
4243

4344
public class RoutingProcedureRunner
4445
{
45-
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
4646
static final String ROUTING_CONTEXT = "context";
4747
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + ROUTING_CONTEXT + "})";
4848
static final String DATABASE_NAME = "database";
@@ -60,7 +60,8 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6060
return connectionStage.thenCompose( connection ->
6161
{
6262
ServerVersion serverVersion = connection.serverVersion();
63-
DirectConnection delegate = new DirectConnection( connection, selectDatabase( serverVersion ), AccessMode.WRITE );
63+
// As the connection can connect to any router (a.k.a. any core members), this connection strictly speaking is a read connection.
64+
DirectConnection delegate = new DirectConnection( connection, SYSTEM_DB_NAME, AccessMode.READ );
6465
Statement procedure = procedureStatement( serverVersion, databaseName );
6566
return runProcedure( delegate, procedure )
6667
.thenCompose( records -> releaseConnection( delegate, records ) )
@@ -70,7 +71,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
7071

7172
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
7273
{
73-
return connection.protocol() // this line fails if database name is provided in BOLT versions that do not support database name.
74+
return connection.protocol()
7475
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
7576
.asyncResult().thenCompose( StatementResultCursor::listAsync );
7677
}
@@ -86,29 +87,16 @@ private Statement procedureStatement( ServerVersion serverVersion, String databa
8687
return new Statement( "CALL " + MULTI_DB_GET_ROUTING_TABLE,
8788
parameters( ROUTING_CONTEXT, context.asMap(), DATABASE_NAME, databaseName ) );
8889
}
89-
else if ( serverVersion.greaterThanOrEqual( v3_2_0 ) )
90+
else
9091
{
92+
if ( !Objects.equals( ABSENT_DB_NAME, databaseName ) )
93+
{
94+
throw new RoutingException( String.format( "Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
95+
"Current server version: %s. Database name: `%s`", serverVersion, databaseName ) );
96+
}
9197
return new Statement( "CALL " + GET_ROUTING_TABLE,
9298
parameters( ROUTING_CONTEXT, context.asMap() ) );
9399
}
94-
else
95-
{
96-
return new Statement( "CALL " + GET_SERVERS );
97-
}
98-
}
99-
100-
private String selectDatabase( ServerVersion serverVersion )
101-
{
102-
if ( serverVersion.greaterThanOrEqual( v4_0_0 ) )
103-
{
104-
// Routing procedure will be called on the system database
105-
return SYSTEM_DB_NAME;
106-
}
107-
else
108-
{
109-
// For lower bolt versions, there is no system database, so we should just run on the "default" database
110-
return ABSENT_DB_NAME;
111-
}
112100
}
113101

114102
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableFactoryImpl.java

Lines changed: 0 additions & 40 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandler.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
public class RoutingTableHandler implements RoutingErrorHandler
3535
{
3636
private final RoutingTable routingTable;
37+
private final String databaseName;
3738
private final RoutingTables routingTables;
3839
private CompletableFuture<RoutingTable> refreshRoutingTableFuture;
3940
private final ConnectionPool connectionPool;
@@ -43,6 +44,7 @@ public class RoutingTableHandler implements RoutingErrorHandler
4344
public RoutingTableHandler( RoutingTable routingTable, Rediscovery rediscovery, ConnectionPool connectionPool, RoutingTables routingTables, Logger log )
4445
{
4546
this.routingTable = routingTable;
47+
this.databaseName = routingTable.database();
4648
this.rediscovery = rediscovery;
4749
this.connectionPool = connectionPool;
4850
this.routingTables = routingTables;
@@ -72,7 +74,7 @@ public synchronized CompletionStage<RoutingTable> freshRoutingTable( AccessMode
7274
else if ( routingTable.isStaleFor( mode ) )
7375
{
7476
// existing routing table is not fresh and should be updated
75-
log.info( "Routing table is stale. %s", routingTable );
77+
log.info( "Routing table for database '%s' is stale. %s", databaseName, routingTable );
7678

7779
CompletableFuture<RoutingTable> resultFuture = new CompletableFuture<>();
7880
refreshRoutingTableFuture = resultFuture;
@@ -107,7 +109,7 @@ private synchronized void freshClusterCompositionFetched( ClusterComposition com
107109
routingTable.update( composition );
108110
connectionPool.retainAll( routingTables.allServers() );
109111

110-
log.info( "Updated routing table. %s", routingTable );
112+
log.info( "Updated routing table for database '%s'. %s", databaseName, routingTable );
111113

112114
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
113115
refreshRoutingTableFuture = null;
@@ -121,13 +123,22 @@ private synchronized void freshClusterCompositionFetched( ClusterComposition com
121123

122124
private synchronized void clusterCompositionLookupFailed( Throwable error )
123125
{
126+
log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName, routingTable ), error );
127+
routingTables.remove( databaseName );
124128
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
125129
refreshRoutingTableFuture = null;
126130
routingTableFuture.completeExceptionally( error );
127131
}
128132

133+
// This method cannot be synchronized as it will visited by all routing table's threads concurrently
129134
public Set<BoltServerAddress> servers()
130135
{
131136
return routingTable.servers();
132137
}
138+
139+
// for testing only
140+
RoutingTable routingTable()
141+
{
142+
return routingTable;
143+
}
133144
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTables.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public interface RoutingTables
3333
{
3434
/**
3535
* Fresh the routing table for the database and given access mode.
36+
* For server version lower than 4.0, the database name will be ignored while refresh routing table.
3637
* @return The future of a new routing table.
3738
*/
3839
CompletionStage<RoutingTable> freshRoutingTable( String databaseName, AccessMode mode );
@@ -47,4 +48,6 @@ public interface RoutingTables
4748
* @return the routing error handler of the given database
4849
*/
4950
RoutingErrorHandler routingErrorHandler( String databaseName );
51+
52+
void remove( String databaseName );
5053
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTablesImpl.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,16 @@
2929
import org.neo4j.driver.internal.BoltServerAddress;
3030
import org.neo4j.driver.internal.RoutingErrorHandler;
3131
import org.neo4j.driver.internal.spi.ConnectionPool;
32+
import org.neo4j.driver.internal.util.Clock;
3233

3334
public class RoutingTablesImpl implements RoutingTables
3435
{
3536
private final ConcurrentMap<String,RoutingTableHandler> routingTables; //TODO ever growing map?
3637
private final RoutingTableHandlerFactory factory;
3738

38-
public RoutingTablesImpl( ConnectionPool connectionPool, RoutingTableFactory routingTableFactory, Rediscovery rediscovery, Logger log )
39+
public RoutingTablesImpl( ConnectionPool connectionPool, Rediscovery rediscovery, BoltServerAddress initialRouter, Clock clock, Logger log )
3940
{
40-
this( new ConcurrentHashMap<>(), new RoutingTableHandlerFactory( connectionPool, routingTableFactory, rediscovery, log ) );
41+
this( new ConcurrentHashMap<>(), new RoutingTableHandlerFactory( connectionPool, rediscovery, initialRouter, clock, log ) );
4142
}
4243

4344
RoutingTablesImpl( ConcurrentMap<String,RoutingTableHandler> routingTables, RoutingTableHandlerFactory factory )
@@ -49,13 +50,15 @@ public RoutingTablesImpl( ConnectionPool connectionPool, RoutingTableFactory rou
4950
@Override
5051
public CompletionStage<RoutingTable> freshRoutingTable( String databaseName, AccessMode mode )
5152
{
52-
RoutingTableHandler handler = routingTableHandler( databaseName );
53+
RoutingTableHandler handler = getOrCreate( databaseName );
5354
return handler.freshRoutingTable( mode );
5455
}
5556

5657
@Override
5758
public Set<BoltServerAddress> allServers()
5859
{
60+
// obviously we just had a snapshot of all servers in all routing tables
61+
// after we read it, the set could already be changed.
5962
Set<BoltServerAddress> servers = new HashSet<>();
6063
for ( RoutingTableHandler tableHandler : routingTables.values() )
6164
{
@@ -67,32 +70,46 @@ public Set<BoltServerAddress> allServers()
6770
@Override
6871
public RoutingErrorHandler routingErrorHandler( String databaseName )
6972
{
70-
return routingTableHandler( databaseName );
73+
RoutingTableHandler handler = routingTables.get( databaseName );
74+
if ( handler == null )
75+
{
76+
throw new IllegalStateException( String.format( "No entry for database '%s' found in routing tables.", databaseName ) );
77+
}
78+
return handler;
79+
}
80+
81+
@Override
82+
public void remove( String databaseName )
83+
{
84+
routingTables.remove( databaseName );
7185
}
7286

73-
private RoutingTableHandler routingTableHandler( String databaseName )
87+
private RoutingTableHandler getOrCreate( String databaseName )
7488
{
7589
return routingTables.computeIfAbsent( databaseName, name -> factory.newInstance( name, this ) );
7690
}
7791

7892
static class RoutingTableHandlerFactory
7993
{
8094
private final ConnectionPool connectionPool;
81-
private final RoutingTableFactory routingTableFactory;
8295
private final Rediscovery rediscovery;
8396
private final Logger log;
97+
private final BoltServerAddress initialRouter;
98+
private final Clock clock;
8499

85-
RoutingTableHandlerFactory( ConnectionPool connectionPool, RoutingTableFactory routingTableFactory, Rediscovery rediscovery, Logger log )
100+
RoutingTableHandlerFactory( ConnectionPool connectionPool, Rediscovery rediscovery, BoltServerAddress initialRouter, Clock clock, Logger log )
86101
{
87102
this.connectionPool = connectionPool;
88-
this.routingTableFactory = routingTableFactory;
89103
this.rediscovery = rediscovery;
104+
this.initialRouter = initialRouter;
105+
this.clock = clock;
90106
this.log = log;
91107
}
92108

93109
RoutingTableHandler newInstance( String databaseName, RoutingTables allTables )
94110
{
95-
return new RoutingTableHandler( routingTableFactory.newInstance( databaseName ), rediscovery, connectionPool, allTables, log );
111+
ClusterRoutingTable routingTable = new ClusterRoutingTable( databaseName, clock, initialRouter );
112+
return new RoutingTableHandler( routingTable, rediscovery, connectionPool, allTables, log );
96113
}
97114
}
98115
}

0 commit comments

Comments
 (0)