Skip to content

Commit 63d6f93

Browse files
author
Zhen Li
committed
Adding more boltkit tests for multi-databases
1 parent 96b6a3b commit 63d6f93

29 files changed

+364
-232
lines changed

driver/src/main/java/org/neo4j/driver/exceptions/RoutingException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
* This exception should not be retried.
2424
* @since 2.0
2525
*/
26-
public class RoutingException extends Neo4jException
26+
public class RoutingException extends ClientException
2727
{
2828
public RoutingException( String message )
2929
{
3030
super( message );
3131
}
3232

33-
public RoutingException( String message, Throwable throwable )
33+
public RoutingException( String code, String message )
3434
{
35-
super( message, throwable);
35+
super( code, message );
3636
}
3737
}

driver/src/main/java/org/neo4j/driver/exceptions/SecurityException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* Restart of server/driver/cluster might be required to recover from this error.
2525
* @since 1.1
2626
*/
27-
public class SecurityException extends Neo4jException
27+
public class SecurityException extends ClientException
2828
{
2929
public SecurityException( String code, String message )
3030
{

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424

2525
public interface ClusterCompositionProvider
2626
{
27-
CompletionStage<ClusterCompositionResponse> getClusterComposition( CompletionStage<Connection> connectionStage, String databaseName );
27+
CompletionStage<ClusterComposition> getClusterComposition( CompletionStage<Connection> connectionStage, String databaseName );
2828
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterCompositionResponse.java

Lines changed: 0 additions & 56 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,12 @@ public boolean isStaleFor( AccessMode mode )
7070
@Override
7171
public boolean isStale( long staleRoutingTableTimeout )
7272
{
73-
return expirationTimeout + staleRoutingTableTimeout < clock.millis();
73+
long expireTime = expirationTimeout + staleRoutingTableTimeout;
74+
if ( expireTime < 0 )
75+
{
76+
expireTime = Long.MAX_VALUE;
77+
}
78+
return expireTime < clock.millis();
7479
}
7580

7681
@Override
@@ -133,7 +138,7 @@ public void removeWriter( BoltServerAddress toRemove )
133138
@Override
134139
public synchronized String toString()
135140
{
136-
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'.",
141+
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'",
137142
expirationTimeout, clock.millis(), routers, writers, readers, databaseName );
138143
}
139144
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.stream.Stream;
3232

3333
import org.neo4j.driver.Logger;
34+
import org.neo4j.driver.exceptions.RoutingException;
3435
import org.neo4j.driver.exceptions.SecurityException;
3536
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3637
import org.neo4j.driver.internal.BoltServerAddress;
@@ -43,13 +44,16 @@
4344
import static java.util.Collections.emptySet;
4445
import static java.util.concurrent.CompletableFuture.completedFuture;
4546
import static java.util.stream.Collectors.toList;
46-
import static org.neo4j.driver.internal.util.ErrorUtil.isRoutingError;
4747
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4848
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4949

50+
/**
51+
* This class is used by all router tables to perform discovery.
52+
* In other words, the methods in this class could be called by multiple threads concurrently.
53+
*/
5054
public class RediscoveryImpl implements Rediscovery
5155
{
52-
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available.";
56+
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery for database '%s'. No routing servers available.";
5357

5458
private final BoltServerAddress initialRouter;
5559
private final RoutingSettings settings;
@@ -58,7 +62,7 @@ public class RediscoveryImpl implements Rediscovery
5862
private final ServerAddressResolver resolver;
5963
private final EventExecutorGroup eventExecutorGroup;
6064

61-
private volatile boolean useInitialRouter;
65+
private volatile boolean useInitialRouter; // TODO thread safe?
6266

6367
public RediscoveryImpl( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider,
6468
EventExecutorGroup eventExecutorGroup, ServerAddressResolver resolver, Logger logger )
@@ -114,7 +118,7 @@ else if ( composition != null )
114118
int newFailures = failures + 1;
115119
if ( newFailures >= settings.maxRoutingFailures() )
116120
{
117-
result.completeExceptionally( new ServiceUnavailableException( NO_ROUTERS_AVAILABLE ) );
121+
result.completeExceptionally( new ServiceUnavailableException( String.format( NO_ROUTERS_AVAILABLE, routingTable.database() ) ) );
118122
}
119123
else
120124
{
@@ -247,25 +251,24 @@ private CompletionStage<ClusterComposition> lookupOnRouter( BoltServerAddress ro
247251
}
248252
else
249253
{
250-
return response.clusterComposition();
254+
return response;
251255
}
252256
} );
253257
}
254258

255259
private ClusterComposition handleRoutingProcedureError( Throwable error, RoutingTable routingTable,
256260
BoltServerAddress routerAddress )
257261
{
258-
if ( error instanceof SecurityException || isRoutingError( error ) )
262+
if ( error instanceof SecurityException || error instanceof RoutingException )
259263
{
260264
// auth error or routing error happened, terminate the discovery procedure immediately
261265
throw new CompletionException( error );
262266
}
263-
{
264-
// connection turned out to be broken
265-
logger.info( format( "Failed to connect to routing server '%s'.", routerAddress ), error );
266-
routingTable.forget( routerAddress );
267-
return null;
268-
}
267+
268+
// Retriable error happened during discovery.
269+
logger.warn( format( "Failed to update routing table with server '%s'.", routerAddress ), error );
270+
routingTable.forget( routerAddress );
271+
return null;
269272
}
270273

271274
private List<BoltServerAddress> resolve( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletionException;
2223
import java.util.concurrent.CompletionStage;
2324

24-
import org.neo4j.driver.internal.spi.Connection;
25-
import org.neo4j.driver.internal.util.Clock;
2625
import org.neo4j.driver.Record;
2726
import org.neo4j.driver.Statement;
2827
import org.neo4j.driver.exceptions.ProtocolException;
29-
import org.neo4j.driver.exceptions.ServiceUnavailableException;
3028
import org.neo4j.driver.exceptions.value.ValueException;
29+
import org.neo4j.driver.internal.spi.Connection;
30+
import org.neo4j.driver.internal.util.Clock;
3131

3232
import static java.lang.String.format;
3333

@@ -50,21 +50,19 @@ public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingSettings
5050
}
5151

5252
@Override
53-
public CompletionStage<ClusterCompositionResponse> getClusterComposition( CompletionStage<Connection> connectionStage, String databaseName )
53+
public CompletionStage<ClusterComposition> getClusterComposition( CompletionStage<Connection> connectionStage, String databaseName )
5454
{
5555
return routingProcedureRunner.run( connectionStage, databaseName )
5656
.thenApply( this::processRoutingResponse );
5757
}
5858

59-
private ClusterCompositionResponse processRoutingResponse( RoutingProcedureResponse response )
59+
private ClusterComposition processRoutingResponse( RoutingProcedureResponse response )
6060
{
6161
if ( !response.isSuccess() )
6262
{
63-
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
64-
"Failed to run '%s' on server. " +
65-
"Please make sure that there is a Neo4j server or cluster up running.",
66-
invokedProcedureString( response ) ), response.error()
67-
) );
63+
throw new CompletionException( format(
64+
"Failed to run '%s' on server. Please make sure that there is a Neo4j server or cluster up running.",
65+
invokedProcedureString( response ) ), response.error() );
6866
}
6967

7068
List<Record> records = response.records();
@@ -74,9 +72,9 @@ private ClusterCompositionResponse processRoutingResponse( RoutingProcedureRespo
7472
// the record size is wrong
7573
if ( records.size() != 1 )
7674
{
77-
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
75+
throw new ProtocolException( format(
7876
PROTOCOL_ERROR_MESSAGE + "records received '%s' is too few or too many.",
79-
invokedProcedureString( response ), records.size() ) ) );
77+
invokedProcedureString( response ), records.size() ) );
8078
}
8179

8280
// failed to parse the record
@@ -87,21 +85,21 @@ private ClusterCompositionResponse processRoutingResponse( RoutingProcedureRespo
8785
}
8886
catch ( ValueException e )
8987
{
90-
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
88+
throw new ProtocolException( format(
9189
PROTOCOL_ERROR_MESSAGE + "unparsable record received.",
92-
invokedProcedureString( response ) ), e ) );
90+
invokedProcedureString( response ) ), e );
9391
}
9492

9593
// the cluster result is not a legal reply
9694
if ( !cluster.hasRoutersAndReaders() )
9795
{
98-
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
96+
throw new ProtocolException( format(
9997
PROTOCOL_ERROR_MESSAGE + "no router or reader found in response.",
100-
invokedProcedureString( response ) ) ) );
98+
invokedProcedureString( response ) ) );
10199
}
102100

103101
// all good
104-
return new ClusterCompositionResponse.Success( cluster );
102+
return cluster;
105103
}
106104

107105
private static String invokedProcedureString( RoutingProcedureResponse response )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class RoutingTableHandler implements RoutingErrorHandler
4343
private final Logger log;
4444

4545
// This defines how long we shall wait before trimming a routing table from routing tables after it is stale.
46+
// TODO make this a configuration option
4647
public static final Duration STALE_ROUTING_TABLE_PURGE_TIMEOUT = Duration.ofSeconds( 30 );
4748

4849

driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424

2525
public final class MultiDatabaseUtil
2626
{
27-
public static final String ABSENT_DB_NAME = "";
28-
public static final String SYSTEM_DB_NAME = "system";
27+
public static final String ABSENT_DB_NAME = ""; // TODO _default
28+
public static final String SYSTEM_DB_NAME = "system"; // TODO _dbms
2929

3030
public static void assertEmptyDatabaseName( String databaseName, int boltVersion )
3131
{

driver/src/main/java/org/neo4j/driver/internal/util/ErrorUtil.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public static Neo4jException newNeo4jError( String code, String message )
6363
{
6464
return new AuthenticationException( code, message );
6565
}
66+
else if ( code.equalsIgnoreCase( "Neo.ClientError.Database.DatabaseNotFound" ) )
67+
{
68+
return new RoutingException( code, message );
69+
}
6670
else
6771
{
6872
return new ClientException( code, message );
@@ -91,23 +95,6 @@ public static boolean isFatal( Throwable error )
9195
return true;
9296
}
9397

94-
public static boolean isRoutingError( Throwable error )
95-
{
96-
if ( error instanceof RoutingException )
97-
{
98-
return true;
99-
}
100-
else if ( error instanceof Neo4jException )
101-
{
102-
String errorCode = ((Neo4jException) error).code();
103-
return errorCode != null && (errorCode.startsWith( "Neo.ClientError.Database.DatabaseNotFound" ));
104-
}
105-
else
106-
{
107-
return false;
108-
}
109-
}
110-
11198
public static void rethrowAsyncException( ExecutionException e )
11299
{
113100
Throwable error = e.getCause();

0 commit comments

Comments
 (0)