Skip to content

Commit 8ebb858

Browse files
author
Zhen
committed
Fixed the routing logic to be the same as routing specification
1 parent 02ce8f0 commit 8ebb858

File tree

5 files changed

+575
-40
lines changed

5 files changed

+575
-40
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,42 @@
2929
import org.neo4j.driver.v1.Logger;
3030
import org.neo4j.driver.v1.Record;
3131
import org.neo4j.driver.v1.Statement;
32-
import org.neo4j.driver.v1.StatementResult;
3332
import org.neo4j.driver.v1.Value;
34-
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
33+
import org.neo4j.driver.v1.exceptions.ClientException;
34+
import org.neo4j.driver.v1.exceptions.ProtocolException;
3535
import org.neo4j.driver.v1.exceptions.value.ValueException;
3636
import org.neo4j.driver.v1.util.Function;
3737

38+
import static java.lang.String.format;
39+
import static org.neo4j.driver.internal.cluster.ClusterComposition.Provider.PROTOCOL_ERROR_MESSAGE;
40+
3841
final class ClusterComposition
3942
{
4043
interface Provider
4144
{
42-
String GET_SERVERS = "CALL dbms.cluster.routing.getServers";
45+
String GET_SERVERS = "dbms.cluster.routing.getServers";
46+
String CALL_GET_SERVERS = "CALL " + GET_SERVERS;
47+
String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server.";
48+
49+
ClusterComposition getClusterComposition( Connection connection )
50+
throws ProtocolException, ProcedureNotFoundException;
51+
52+
class ProcedureNotFoundException extends Exception
53+
{
54+
ProcedureNotFoundException( String message )
55+
{
56+
super( message );
57+
}
4358

44-
ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException;
59+
ProcedureNotFoundException( String message, Throwable e )
60+
{
61+
super( message, e );
62+
}
63+
}
4564

4665
final class Default implements Provider
4766
{
48-
private static final Statement GET_SERVER = new Statement( Provider.GET_SERVERS );
67+
private static final Statement CALL_GET_SERVER = new Statement( Provider.CALL_GET_SERVERS );
4968
private final Clock clock;
5069
private final Logger log;
5170

@@ -56,30 +75,40 @@ final class Default implements Provider
5675
}
5776

5877
@Override
59-
public ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException
78+
public ClusterComposition getClusterComposition( Connection connection )
79+
throws ProtocolException, ProcedureNotFoundException
6080
{
61-
StatementResult cursor = getServers( connection );
62-
List<Record> records = cursor.list();
81+
List<Record> records = getServers( connection );
6382
log.info( "Got getServers response: %s", records );
6483
long now = clock.millis();
65-
try
84+
85+
if ( records.size() != 1 )
6686
{
67-
if ( records.size() != 1 )
68-
{
69-
// server returned too few or too many rows, this is a contract violation, treat as incapable
70-
return null;
71-
}
72-
return read( records.get( 0 ), now );
87+
throw new ProtocolException( format(
88+
"%s%nRecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
89+
records.size() ) );
7390
}
74-
finally
91+
ClusterComposition cluster = read( records.get( 0 ), now );
92+
if ( cluster.isIllegalResponse() )
7593
{
76-
cursor.consume(); // make sure we exhaust the results
94+
throw new ProtocolException( format( "%s%nNo router or reader found in response.",
95+
PROTOCOL_ERROR_MESSAGE ) );
7796
}
97+
return cluster;
7898
}
7999

80-
private StatementResult getServers( Connection connection )
100+
private List<Record> getServers( Connection connection ) throws ProcedureNotFoundException
81101
{
82-
return NetworkSession.run( connection, GET_SERVER );
102+
try
103+
{
104+
return NetworkSession.run( connection, CALL_GET_SERVER ).list();
105+
}
106+
catch ( ClientException e )
107+
{
108+
throw new ProcedureNotFoundException( format("Failed to call '%s' procedure on server. " +
109+
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
110+
GET_SERVERS ), e );
111+
}
83112
}
84113
}
85114
}
@@ -120,7 +149,11 @@ private ClusterComposition( long expirationTimestamp )
120149

121150
public boolean isValid()
122151
{
123-
return !routers.isEmpty() && !writers.isEmpty();
152+
return !writers.isEmpty();
153+
}
154+
public boolean isIllegalResponse()
155+
{
156+
return routers.isEmpty() || readers.isEmpty();
124157
}
125158

126159
public Set<BoltServerAddress> readers()
@@ -173,7 +206,7 @@ public Void apply( Value value )
173206
}
174207
catch ( ValueException e )
175208
{
176-
return null;
209+
throw new ProtocolException( format( "%s%nUnparsable record received.", PROTOCOL_ERROR_MESSAGE ), e );
177210
}
178211
}
179212

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.driver.internal.spi.ConnectionPool;
2424
import org.neo4j.driver.internal.util.Clock;
2525
import org.neo4j.driver.v1.Logger;
26+
import org.neo4j.driver.v1.exceptions.ProtocolException;
2627
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2728

2829
import static java.lang.String.format;
@@ -49,11 +50,8 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste
4950
public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable )
5051
throws InterruptedException, ServiceUnavailableException
5152
{
52-
int size = routingTable.routerSize(), failures = 0;
53-
if ( size == 0 )
54-
{
55-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
56-
}
53+
int failures = 0;
54+
5755
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
5856
{
5957
long waitTime = start + delay - clock.millis();
@@ -62,8 +60,10 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
6260
clock.sleep( waitTime );
6361
}
6462
start = clock.millis();
65-
for ( int i = 0; i < size; i++ )
63+
for ( int i = 0, size = routingTable.routerSize(); i < size; i++ )
6664
{
65+
assertRouterIsNotEmpty( size );
66+
6767
BoltServerAddress address = routingTable.nextRouter();
6868
if ( address == null )
6969
{
@@ -75,22 +75,25 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
7575
cluster = provider.getClusterComposition( connection );
7676
logger.info( "Got cluster composition %s", cluster );
7777
}
78-
catch ( Exception e )
78+
catch( ProtocolException e )
7979
{
80-
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
81-
continue;
80+
// illegal response
81+
throw e;
8282
}
83-
if ( cluster == null || !cluster.isValid() )
83+
catch( ClusterComposition.Provider.ProcedureNotFoundException e )
8484
{
85-
logger.info( "Server <%s> unable to perform routing capability, dropping from list of routers.",
86-
address );
85+
// talking to a server does not support CC?
86+
throw new ServiceUnavailableException( e.getMessage(), e.getCause() );
87+
}
88+
catch ( Exception e )
89+
{
90+
// the connection breaks
91+
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
8792
routingTable.removeRouter( address );
88-
if ( --size == 0 )
89-
{
90-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
91-
}
93+
assertRouterIsNotEmpty( routingTable.routerSize() );
94+
continue;
9295
}
93-
else
96+
if ( cluster.isValid() )
9497
{
9598
return cluster;
9699
}
@@ -101,4 +104,12 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
101104
}
102105
}
103106
}
107+
108+
private void assertRouterIsNotEmpty( int size )
109+
{
110+
if ( size == 0 )
111+
{
112+
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
113+
}
114+
}
104115
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.neo4j.driver.v1.exceptions;
2+
3+
/**
4+
* A signal that the contract for client-server communication has broken down.
5+
* The user should contact support and cannot resolve this his or herself.
6+
*/
7+
public class ProtocolException extends RuntimeException
8+
{
9+
private static String CODE = "Protocol violation: ";
10+
public ProtocolException( String message )
11+
{
12+
super( CODE + message );
13+
}
14+
15+
public ProtocolException( String message, Throwable e )
16+
{
17+
super( CODE + message, e );
18+
}
19+
}

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
147147
}
148148
}
149149

150-
private ClusterComposition getClusterComposition()
150+
private ClusterComposition getClusterComposition() throws ClusterComposition.Provider.ProcedureNotFoundException
151151
{
152152
return new ClusterComposition.Provider.Default( clock, mock( Logger.class ) )
153153
.getClusterComposition( connection );
@@ -166,7 +166,7 @@ private void values( final Value[]... records )
166166
private void onGetServers( Stubber stubber )
167167
{
168168
stubber.when( connection ).run(
169-
eq( ClusterComposition.Provider.GET_SERVERS ),
169+
eq( ClusterComposition.Provider.CALL_GET_SERVERS ),
170170
eq( Collections.<String,Value>emptyMap() ),
171171
any( Collector.class ) );
172172
}

0 commit comments

Comments
 (0)