Skip to content

Commit 925840e

Browse files
author
Zhen Li
committed
For routing driver, by default, get the routing table for default-database.
As this makes the routing table map easier for server version lower than 4.0. Otherwise, we need to detect if it is server version lower than 4.0, then we can only get routing table for default_db, and then we need to reset the key in routing table map to be default_db. TODO: error handling and removing from routing table map. BoltKit tests are failing because the server version is set wrongly. Also missing any integration tests for 4.0 routing driver.
1 parent 861acd2 commit 925840e

File tree

7 files changed

+30
-42
lines changed

7 files changed

+30
-42
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.neo4j.driver.internal.spi.ConnectionProvider;
2828

2929
import static org.neo4j.driver.AccessMode.READ;
30-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
30+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3131

3232
/**
3333
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
@@ -53,8 +53,9 @@ public CompletionStage<Connection> acquireConnection( String databaseName, Acces
5353
@Override
5454
public CompletionStage<Void> verifyConnectivity()
5555
{
56-
// we verify the connection by establishing the connection to the default database
57-
return acquireConnection( SYSTEM_DB_NAME, READ ).thenCompose( Connection::release );
56+
// We verify the connection by establishing a connection with the remote server specified by the address.
57+
// Database name will be ignored as no query is run in this connection and the connection is released immediately.
58+
return acquireConnection( ABSENT_DB_NAME, READ ).thenCompose( Connection::release );
5859
}
5960

6061
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
2425

@@ -37,12 +38,10 @@
3738
import static org.neo4j.driver.Values.parameters;
3839
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3940
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
40-
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
4141
import static org.neo4j.driver.internal.util.ServerVersion.v4_0_0;
4242

4343
public class RoutingProcedureRunner
4444
{
45-
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
4645
static final String ROUTING_CONTEXT = "context";
4746
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + ROUTING_CONTEXT + "})";
4847
static final String DATABASE_NAME = "database";
@@ -60,7 +59,8 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6059
return connectionStage.thenCompose( connection ->
6160
{
6261
ServerVersion serverVersion = connection.serverVersion();
63-
DirectConnection delegate = new DirectConnection( connection, selectDatabase( serverVersion ), AccessMode.WRITE );
62+
// As the connection can connect to any router (a.k.a. any core members), this connection strictly speaking is a read connection.
63+
DirectConnection delegate = new DirectConnection( connection, selectExecutionDatabase( serverVersion ), AccessMode.READ );
6464
Statement procedure = procedureStatement( serverVersion, databaseName );
6565
return runProcedure( delegate, procedure )
6666
.thenCompose( records -> releaseConnection( delegate, records ) )
@@ -70,7 +70,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
7070

7171
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
7272
{
73-
return connection.protocol() // this line fails if database name is provided in BOLT versions that do not support database name.
73+
return connection.protocol()
7474
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
7575
.asyncResult().thenCompose( StatementResultCursor::listAsync );
7676
}
@@ -86,22 +86,24 @@ private Statement procedureStatement( ServerVersion serverVersion, String databa
8686
return new Statement( "CALL " + MULTI_DB_GET_ROUTING_TABLE,
8787
parameters( ROUTING_CONTEXT, context.asMap(), DATABASE_NAME, databaseName ) );
8888
}
89-
else if ( serverVersion.greaterThanOrEqual( v3_2_0 ) )
89+
else
9090
{
91+
if ( !Objects.equals( ABSENT_DB_NAME, databaseName ) )
92+
{
93+
// TODO maybe a routing error
94+
throw new SecurityException( String.format( "Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
95+
"Current server version: %s. Database name: `%s`", serverVersion, databaseName ) );
96+
}
9197
return new Statement( "CALL " + GET_ROUTING_TABLE,
9298
parameters( ROUTING_CONTEXT, context.asMap() ) );
9399
}
94-
else
95-
{
96-
return new Statement( "CALL " + GET_SERVERS );
97-
}
98100
}
99101

100-
private String selectDatabase( ServerVersion serverVersion )
102+
private String selectExecutionDatabase( ServerVersion serverVersion )
101103
{
102104
if ( serverVersion.greaterThanOrEqual( v4_0_0 ) )
103105
{
104-
// Routing procedure will be called on the system database
106+
// Routing procedure will be called on the system database as system database is the only database that is guaranteed to be on any cores.
105107
return SYSTEM_DB_NAME;
106108
}
107109
else

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public interface RoutingTables
3333
{
3434
/**
3535
* Fresh the routing table for the database and given access mode.
36+
* For server version lower than 4.0, the database name will be ignored while refresh routing table.
3637
* @return The future of a new routing table.
3738
*/
3839
CompletionStage<RoutingTable> freshRoutingTable( String databaseName, AccessMode mode );

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.neo4j.driver.internal.util.Futures;
4848
import org.neo4j.driver.net.ServerAddressResolver;
4949

50-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
50+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
5151

5252
public class LoadBalancer implements ConnectionProvider
5353
{
@@ -94,7 +94,7 @@ public CompletionStage<Connection> acquireConnection( String databaseName, Acces
9494
@Override
9595
public CompletionStage<Void> verifyConnectivity()
9696
{
97-
return routingTables.freshRoutingTable( SYSTEM_DB_NAME, AccessMode.READ ).thenApply( routingTable -> null );
97+
return routingTables.freshRoutingTable( ABSENT_DB_NAME, AccessMode.READ ).thenApply( routingTable -> null );
9898
}
9999

100100
@Override

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java renamed to driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal;
19+
package org.neo4j.driver.integration;
2020

2121
import org.junit.jupiter.api.Test;
2222

@@ -28,6 +28,7 @@
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030

31+
import org.neo4j.driver.internal.DriverFactory;
3132
import org.neo4j.driver.internal.cluster.RoutingSettings;
3233
import org.neo4j.driver.internal.retry.RetrySettings;
3334
import org.neo4j.driver.internal.util.DriverFactoryWithClock;

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE;
4949
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.MULTI_DB_GET_ROUTING_TABLE;
5050
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.ROUTING_CONTEXT;
51-
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_SERVERS;
5251
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
5352
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
5453
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -61,7 +60,7 @@ class RoutingProcedureRunnerTest
6160
{
6261
@ParameterizedTest
6362
@ValueSource( strings = {ABSENT_DB_NAME, SYSTEM_DB_NAME, " this is a db name "} )
64-
void shouldCallGetRoutingTableWithEmptyMapWithDatabaseName( String db )
63+
void shouldCallGetRoutingTableWithEmptyMapOnSystemDatabaseForDatabase( String db )
6564
{
6665
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( RoutingContext.EMPTY,
6766
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ), SYSTEM_DB_NAME );
@@ -77,7 +76,7 @@ void shouldCallGetRoutingTableWithEmptyMapWithDatabaseName( String db )
7776

7877
@ParameterizedTest
7978
@ValueSource( strings = {ABSENT_DB_NAME, SYSTEM_DB_NAME, " this is a db name "} )
80-
void shouldCallGetRoutingTableWithParamAndDatabaseName( String db )
79+
void shouldCallGetRoutingTableWithParamOnSystemDatabaseForDatabase( String db )
8180
{
8281
URI uri = URI.create( "neo4j://localhost/?key1=value1&key2=value2" );
8382
RoutingContext context = new RoutingContext( uri );
@@ -124,22 +123,6 @@ void shouldCallGetRoutingTableWithParam()
124123
assertEquals( new Statement( "CALL " + GET_ROUTING_TABLE, expectedParams ), response.procedure() );
125124
}
126125

127-
@Test
128-
void shouldCallGetServers()
129-
{
130-
URI uri = URI.create( "neo4j://localhost/?key1=value1&key2=value2" );
131-
RoutingContext context = new RoutingContext( uri );
132-
133-
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( context,
134-
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ) );
135-
136-
RoutingProcedureResponse response = await( runner.run( connectionStage( "Neo4j/3.1.8" ), ABSENT_DB_NAME ) );
137-
138-
assertTrue( response.isSuccess() );
139-
assertEquals( 2, response.records().size() );
140-
assertEquals( new Statement( "CALL " + GET_SERVERS ), response.procedure() );
141-
}
142-
143126
@Test
144127
void shouldReturnFailedResponseOnClientException()
145128
{
@@ -219,7 +202,7 @@ private static CompletionStage<Connection> connectionStage( String serverVersion
219202
private static class TestRoutingProcedureRunner extends RoutingProcedureRunner
220203
{
221204
final CompletionStage<List<Record>> runProcedureResult;
222-
final String database;
205+
final String executionDatabase;
223206

224207
TestRoutingProcedureRunner( RoutingContext context )
225208
{
@@ -231,17 +214,17 @@ private static class TestRoutingProcedureRunner extends RoutingProcedureRunner
231214
this( context, runProcedureResult, ABSENT_DB_NAME );
232215
}
233216

234-
TestRoutingProcedureRunner( RoutingContext context, CompletionStage<List<Record>> runProcedureResult, String database )
217+
TestRoutingProcedureRunner( RoutingContext context, CompletionStage<List<Record>> runProcedureResult, String executionDatabase )
235218
{
236219
super( context );
237220
this.runProcedureResult = runProcedureResult;
238-
this.database = database;
221+
this.executionDatabase = executionDatabase;
239222
}
240223

241224
@Override
242225
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
243226
{
244-
assertEquals( database, connection.databaseName() );
227+
assertEquals( executionDatabase, connection.databaseName() );
245228
return runProcedureResult;
246229
}
247230
}

driver/src/test/resources/acquire_endpoints_v3.script

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
!: AUTO RESET
33

44
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5-
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6-
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
5+
S: SUCCESS {"server": "Neo4j/3.9.9", "connection_id": "bolt-123456789"}
6+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {"mode": "r"}
77
PULL_ALL
88
S: SUCCESS {"fields": ["ttl", "servers"]}
99
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]

0 commit comments

Comments
 (0)