Skip to content

Commit 2ffd5f4

Browse files
author
Zhen Li
committed
For routing driver, by default, get the routing table for default-database.
As this makes the routing table map easier for server version lower than 4.0. Otherwise, we need to detect if it is server version lower than 4.0, then we can only get routing table for default_db, and then we need to reset the key in routing table map to be default_db. TODO: error handling and removing from routing table map. BoltKit tests are failing because the server version is set wrongly. Also missing any integration tests for 4.0 routing driver.
1 parent 861acd2 commit 2ffd5f4

File tree

10 files changed

+89
-60
lines changed

10 files changed

+89
-60
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2002-2019 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.exceptions;
20+
21+
/**
22+
* A routing error indicate a fatal problem to obtain routing tables such as the routing table for a specified database does not exist.
23+
* This exception should not be retried.
24+
* @since 2.0
25+
*/
26+
public class RoutingException extends Neo4jException
27+
{
28+
public RoutingException( String message )
29+
{
30+
super( message );
31+
}
32+
33+
public RoutingException( String message, Throwable throwable )
34+
{
35+
super( message, throwable);
36+
}
37+
}

driver/src/main/java/org/neo4j/driver/internal/DirectConnectionProvider.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.neo4j.driver.internal.spi.ConnectionProvider;
2828

2929
import static org.neo4j.driver.AccessMode.READ;
30-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
30+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3131

3232
/**
3333
* Simple {@link ConnectionProvider connection provider} that obtains connections form the given pool only for
@@ -53,8 +53,9 @@ public CompletionStage<Connection> acquireConnection( String databaseName, Acces
5353
@Override
5454
public CompletionStage<Void> verifyConnectivity()
5555
{
56-
// we verify the connection by establishing the connection to the default database
57-
return acquireConnection( SYSTEM_DB_NAME, READ ).thenCompose( Connection::release );
56+
// We verify the connection by establishing a connection with the remote server specified by the address.
57+
// Database name will be ignored as no query is run in this connection and the connection is released immediately.
58+
return acquireConnection( ABSENT_DB_NAME, READ ).thenCompose( Connection::release );
5859
}
5960

6061
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,20 @@
2828
import java.util.concurrent.CompletionStage;
2929
import java.util.concurrent.TimeUnit;
3030

31+
import org.neo4j.driver.Logger;
32+
import org.neo4j.driver.exceptions.SecurityException;
33+
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3134
import org.neo4j.driver.internal.BoltServerAddress;
3235
import org.neo4j.driver.internal.spi.Connection;
3336
import org.neo4j.driver.internal.spi.ConnectionPool;
3437
import org.neo4j.driver.internal.util.Futures;
35-
import org.neo4j.driver.Logger;
36-
import org.neo4j.driver.exceptions.SecurityException;
37-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3838
import org.neo4j.driver.net.ServerAddressResolver;
3939

4040
import static java.lang.String.format;
4141
import static java.util.Collections.emptySet;
4242
import static java.util.concurrent.CompletableFuture.completedFuture;
4343
import static java.util.stream.Collectors.toList;
44+
import static org.neo4j.driver.internal.util.ErrorUtil.isRoutingError;
4445
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4546
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4647

@@ -251,12 +252,11 @@ private CompletionStage<ClusterComposition> lookupOnRouter( BoltServerAddress ro
251252
private ClusterComposition handleRoutingProcedureError( Throwable error, RoutingTable routingTable,
252253
BoltServerAddress routerAddress )
253254
{
254-
if ( error instanceof SecurityException )
255+
if ( error instanceof SecurityException || isRoutingError( error ) )
255256
{
256-
// auth error happened, terminate the discovery procedure immediately
257+
// auth error or routing error happened, terminate the discovery procedure immediately
257258
throw new CompletionException( error );
258259
}
259-
else
260260
{
261261
// connection turned out to be broken
262262
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), error );

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.concurrent.CompletionException;
2324
import java.util.concurrent.CompletionStage;
2425

@@ -28,6 +29,7 @@
2829
import org.neo4j.driver.TransactionConfig;
2930
import org.neo4j.driver.async.StatementResultCursor;
3031
import org.neo4j.driver.exceptions.ClientException;
32+
import org.neo4j.driver.exceptions.RoutingException;
3133
import org.neo4j.driver.internal.BookmarksHolder;
3234
import org.neo4j.driver.internal.async.connection.DirectConnection;
3335
import org.neo4j.driver.internal.spi.Connection;
@@ -37,12 +39,10 @@
3739
import static org.neo4j.driver.Values.parameters;
3840
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
3941
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
40-
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
4142
import static org.neo4j.driver.internal.util.ServerVersion.v4_0_0;
4243

4344
public class RoutingProcedureRunner
4445
{
45-
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
4646
static final String ROUTING_CONTEXT = "context";
4747
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + ROUTING_CONTEXT + "})";
4848
static final String DATABASE_NAME = "database";
@@ -60,7 +60,8 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
6060
return connectionStage.thenCompose( connection ->
6161
{
6262
ServerVersion serverVersion = connection.serverVersion();
63-
DirectConnection delegate = new DirectConnection( connection, selectDatabase( serverVersion ), AccessMode.WRITE );
63+
// As the connection can connect to any router (a.k.a. any core members), this connection strictly speaking is a read connection.
64+
DirectConnection delegate = new DirectConnection( connection, SYSTEM_DB_NAME, AccessMode.READ );
6465
Statement procedure = procedureStatement( serverVersion, databaseName );
6566
return runProcedure( delegate, procedure )
6667
.thenCompose( records -> releaseConnection( delegate, records ) )
@@ -70,7 +71,7 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
7071

7172
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
7273
{
73-
return connection.protocol() // this line fails if database name is provided in BOLT versions that do not support database name.
74+
return connection.protocol()
7475
.runInAutoCommitTransaction( connection, procedure, BookmarksHolder.NO_OP, TransactionConfig.empty(), true )
7576
.asyncResult().thenCompose( StatementResultCursor::listAsync );
7677
}
@@ -86,29 +87,16 @@ private Statement procedureStatement( ServerVersion serverVersion, String databa
8687
return new Statement( "CALL " + MULTI_DB_GET_ROUTING_TABLE,
8788
parameters( ROUTING_CONTEXT, context.asMap(), DATABASE_NAME, databaseName ) );
8889
}
89-
else if ( serverVersion.greaterThanOrEqual( v3_2_0 ) )
90+
else
9091
{
92+
if ( !Objects.equals( ABSENT_DB_NAME, databaseName ) )
93+
{
94+
throw new RoutingException( String.format( "Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
95+
"Current server version: %s. Database name: `%s`", serverVersion, databaseName ) );
96+
}
9197
return new Statement( "CALL " + GET_ROUTING_TABLE,
9298
parameters( ROUTING_CONTEXT, context.asMap() ) );
9399
}
94-
else
95-
{
96-
return new Statement( "CALL " + GET_SERVERS );
97-
}
98-
}
99-
100-
private String selectDatabase( ServerVersion serverVersion )
101-
{
102-
if ( serverVersion.greaterThanOrEqual( v4_0_0 ) )
103-
{
104-
// Routing procedure will be called on the system database
105-
return SYSTEM_DB_NAME;
106-
}
107-
else
108-
{
109-
// For lower bolt versions, there is no system database, so we should just run on the "default" database
110-
return ABSENT_DB_NAME;
111-
}
112100
}
113101

114102
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTables.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public interface RoutingTables
3333
{
3434
/**
3535
* Fresh the routing table for the database and given access mode.
36+
* For server version lower than 4.0, the database name will be ignored while refresh routing table.
3637
* @return The future of a new routing table.
3738
*/
3839
CompletionStage<RoutingTable> freshRoutingTable( String databaseName, AccessMode mode );

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import org.neo4j.driver.internal.util.Futures;
4848
import org.neo4j.driver.net.ServerAddressResolver;
4949

50-
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
50+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
5151

5252
public class LoadBalancer implements ConnectionProvider
5353
{
@@ -94,7 +94,7 @@ public CompletionStage<Connection> acquireConnection( String databaseName, Acces
9494
@Override
9595
public CompletionStage<Void> verifyConnectivity()
9696
{
97-
return routingTables.freshRoutingTable( SYSTEM_DB_NAME, AccessMode.READ ).thenApply( routingTable -> null );
97+
return routingTables.freshRoutingTable( ABSENT_DB_NAME, AccessMode.READ ).thenApply( routingTable -> null );
9898
}
9999

100100
@Override

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.neo4j.driver.exceptions.ClientException;
2828
import org.neo4j.driver.exceptions.DatabaseException;
2929
import org.neo4j.driver.exceptions.Neo4jException;
30+
import org.neo4j.driver.exceptions.RoutingException;
3031
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3132
import org.neo4j.driver.exceptions.TransientException;
3233

@@ -90,6 +91,23 @@ public static boolean isFatal( Throwable error )
9091
return true;
9192
}
9293

94+
public static boolean isRoutingError( Throwable error )
95+
{
96+
if ( error instanceof RoutingException )
97+
{
98+
return true;
99+
}
100+
else if ( error instanceof Neo4jException )
101+
{
102+
String errorCode = ((Neo4jException) error).code();
103+
return errorCode != null && (errorCode.startsWith( "Neo.ClientError.Database.DatabaseNotFound" ));
104+
}
105+
else
106+
{
107+
return false;
108+
}
109+
}
110+
93111
public static void rethrowAsyncException( ExecutionException e )
94112
{
95113
Throwable error = e.getCause();

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java renamed to driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal;
19+
package org.neo4j.driver.integration;
2020

2121
import org.junit.jupiter.api.Test;
2222

@@ -28,6 +28,7 @@
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030

31+
import org.neo4j.driver.internal.DriverFactory;
3132
import org.neo4j.driver.internal.cluster.RoutingSettings;
3233
import org.neo4j.driver.internal.retry.RetrySettings;
3334
import org.neo4j.driver.internal.util.DriverFactoryWithClock;

driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE;
4949
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.MULTI_DB_GET_ROUTING_TABLE;
5050
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.ROUTING_CONTEXT;
51-
import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_SERVERS;
5251
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.ABSENT_DB_NAME;
5352
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.SYSTEM_DB_NAME;
5453
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
@@ -61,7 +60,7 @@ class RoutingProcedureRunnerTest
6160
{
6261
@ParameterizedTest
6362
@ValueSource( strings = {ABSENT_DB_NAME, SYSTEM_DB_NAME, " this is a db name "} )
64-
void shouldCallGetRoutingTableWithEmptyMapWithDatabaseName( String db )
63+
void shouldCallGetRoutingTableWithEmptyMapOnSystemDatabaseForDatabase( String db )
6564
{
6665
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( RoutingContext.EMPTY,
6766
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ), SYSTEM_DB_NAME );
@@ -77,7 +76,7 @@ void shouldCallGetRoutingTableWithEmptyMapWithDatabaseName( String db )
7776

7877
@ParameterizedTest
7978
@ValueSource( strings = {ABSENT_DB_NAME, SYSTEM_DB_NAME, " this is a db name "} )
80-
void shouldCallGetRoutingTableWithParamAndDatabaseName( String db )
79+
void shouldCallGetRoutingTableWithParamOnSystemDatabaseForDatabase( String db )
8180
{
8281
URI uri = URI.create( "neo4j://localhost/?key1=value1&key2=value2" );
8382
RoutingContext context = new RoutingContext( uri );
@@ -124,22 +123,6 @@ void shouldCallGetRoutingTableWithParam()
124123
assertEquals( new Statement( "CALL " + GET_ROUTING_TABLE, expectedParams ), response.procedure() );
125124
}
126125

127-
@Test
128-
void shouldCallGetServers()
129-
{
130-
URI uri = URI.create( "neo4j://localhost/?key1=value1&key2=value2" );
131-
RoutingContext context = new RoutingContext( uri );
132-
133-
RoutingProcedureRunner runner = new TestRoutingProcedureRunner( context,
134-
completedFuture( asList( mock( Record.class ), mock( Record.class ) ) ) );
135-
136-
RoutingProcedureResponse response = await( runner.run( connectionStage( "Neo4j/3.1.8" ), ABSENT_DB_NAME ) );
137-
138-
assertTrue( response.isSuccess() );
139-
assertEquals( 2, response.records().size() );
140-
assertEquals( new Statement( "CALL " + GET_SERVERS ), response.procedure() );
141-
}
142-
143126
@Test
144127
void shouldReturnFailedResponseOnClientException()
145128
{
@@ -219,7 +202,7 @@ private static CompletionStage<Connection> connectionStage( String serverVersion
219202
private static class TestRoutingProcedureRunner extends RoutingProcedureRunner
220203
{
221204
final CompletionStage<List<Record>> runProcedureResult;
222-
final String database;
205+
final String executionDatabase;
223206

224207
TestRoutingProcedureRunner( RoutingContext context )
225208
{
@@ -231,17 +214,17 @@ private static class TestRoutingProcedureRunner extends RoutingProcedureRunner
231214
this( context, runProcedureResult, ABSENT_DB_NAME );
232215
}
233216

234-
TestRoutingProcedureRunner( RoutingContext context, CompletionStage<List<Record>> runProcedureResult, String database )
217+
TestRoutingProcedureRunner( RoutingContext context, CompletionStage<List<Record>> runProcedureResult, String executionDatabase )
235218
{
236219
super( context );
237220
this.runProcedureResult = runProcedureResult;
238-
this.database = database;
221+
this.executionDatabase = executionDatabase;
239222
}
240223

241224
@Override
242225
CompletionStage<List<Record>> runProcedure( Connection connection, Statement procedure )
243226
{
244-
assertEquals( database, connection.databaseName() );
227+
assertEquals( executionDatabase, connection.databaseName() );
245228
return runProcedureResult;
246229
}
247230
}

driver/src/test/resources/acquire_endpoints_v3.script

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
!: AUTO RESET
33

44
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5-
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6-
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
5+
S: SUCCESS {"server": "Neo4j/3.9.9", "connection_id": "bolt-123456789"}
6+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {"mode": "r"}
77
PULL_ALL
88
S: SUCCESS {"fields": ["ttl", "servers"]}
99
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]

0 commit comments

Comments
 (0)