Skip to content

Commit 5d0403a

Browse files
authored
Merge pull request #314 from zhenlineo/1.1-fix-routing
Update routing according to specification
2 parents f29728b + 370e4af commit 5d0403a

18 files changed

+1256
-1531
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 27 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -19,71 +19,15 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.HashSet;
22-
import java.util.List;
2322
import java.util.Set;
2423

25-
import org.neo4j.driver.internal.NetworkSession;
2624
import org.neo4j.driver.internal.net.BoltServerAddress;
27-
import org.neo4j.driver.internal.spi.Connection;
28-
import org.neo4j.driver.internal.util.Clock;
29-
import org.neo4j.driver.v1.Logger;
3025
import org.neo4j.driver.v1.Record;
31-
import org.neo4j.driver.v1.Statement;
32-
import org.neo4j.driver.v1.StatementResult;
3326
import org.neo4j.driver.v1.Value;
34-
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
35-
import org.neo4j.driver.v1.exceptions.value.ValueException;
3627
import org.neo4j.driver.v1.util.Function;
3728

3829
final class ClusterComposition
3930
{
40-
interface Provider
41-
{
42-
String GET_SERVERS = "CALL dbms.cluster.routing.getServers";
43-
44-
ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException;
45-
46-
final class Default implements Provider
47-
{
48-
private static final Statement GET_SERVER = new Statement( Provider.GET_SERVERS );
49-
private final Clock clock;
50-
private final Logger log;
51-
52-
Default( Clock clock, Logger log )
53-
{
54-
this.clock = clock;
55-
this.log = log;
56-
}
57-
58-
@Override
59-
public ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException
60-
{
61-
StatementResult cursor = getServers( connection );
62-
List<Record> records = cursor.list();
63-
log.info( "Got getServers response: %s", records );
64-
long now = clock.millis();
65-
try
66-
{
67-
if ( records.size() != 1 )
68-
{
69-
// server returned too few or too many rows, this is a contract violation, treat as incapable
70-
return null;
71-
}
72-
return read( records.get( 0 ), now );
73-
}
74-
finally
75-
{
76-
cursor.consume(); // make sure we exhaust the results
77-
}
78-
}
79-
80-
private StatementResult getServers( Connection connection )
81-
{
82-
return NetworkSession.run( connection, GET_SERVER );
83-
}
84-
}
85-
}
86-
8731
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
8832
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
8933
new Function<Value,BoltServerAddress>()
@@ -94,8 +38,11 @@ public BoltServerAddress apply( Value value )
9438
return new BoltServerAddress( value.asString() );
9539
}
9640
};
97-
private final Set<BoltServerAddress> readers, writers, routers;
98-
final long expirationTimestamp;
41+
42+
private final Set<BoltServerAddress> readers;
43+
private final Set<BoltServerAddress> writers;
44+
private final Set<BoltServerAddress> routers;
45+
private final long expirationTimestamp;
9946

10047
private ClusterComposition( long expirationTimestamp )
10148
{
@@ -118,9 +65,13 @@ private ClusterComposition( long expirationTimestamp )
11865
this.routers.addAll( routers );
11966
}
12067

121-
public boolean isValid()
68+
public boolean hasWriters()
12269
{
123-
return !routers.isEmpty() && !writers.isEmpty();
70+
return !writers.isEmpty();
71+
}
72+
public boolean hasRoutersAndReaders()
73+
{
74+
return routers.isEmpty() || readers.isEmpty();
12475
}
12576

12677
public Set<BoltServerAddress> readers()
@@ -138,6 +89,10 @@ public Set<BoltServerAddress> routers()
13889
return new HashSet<>( routers );
13990
}
14091

92+
public long expirationTimestamp() {
93+
return this.expirationTimestamp;
94+
}
95+
14196
@Override
14297
public String toString()
14398
{
@@ -149,32 +104,25 @@ public String toString()
149104
'}';
150105
}
151106

152-
private static ClusterComposition read( Record record, long now )
107+
public static ClusterComposition parse( Record record, long now )
153108
{
154109
if ( record == null )
155110
{
156111
return null;
157112
}
158-
try
113+
114+
final ClusterComposition result = new ClusterComposition( expirationTimestamp( now, record ) );
115+
record.get( "servers" ).asList( new Function<Value,Void>()
159116
{
160-
final ClusterComposition result;
161-
result = new ClusterComposition( expirationTimestamp( now, record ) );
162-
record.get( "servers" ).asList( new Function<Value,Void>()
117+
@Override
118+
public Void apply( Value value )
163119
{
164-
@Override
165-
public Void apply( Value value )
166-
{
167-
result.servers( value.get( "role" ).asString() )
168-
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
169-
return null;
170-
}
171-
} );
172-
return result;
173-
}
174-
catch ( ValueException e )
175-
{
176-
return null;
177-
}
120+
result.servers( value.get( "role" ).asString() )
121+
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
122+
return null;
123+
}
124+
} );
125+
return result;
178126
}
179127

180128
private static long expirationTimestamp( long now, Record record )
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.neo4j.driver.internal.spi.Connection;
22+
23+
public interface ClusterCompositionProvider
24+
{
25+
ClusterCompositionResponse getClusterComposition( Connection connection );
26+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
public interface ClusterCompositionResponse
22+
{
23+
ClusterComposition clusterComposition();
24+
25+
class Failure implements ClusterCompositionResponse
26+
{
27+
private final RuntimeException error;
28+
29+
public Failure( RuntimeException t )
30+
{
31+
this.error = t;
32+
}
33+
34+
@Override
35+
public ClusterComposition clusterComposition()
36+
{
37+
throw this.error;
38+
}
39+
}
40+
41+
class Success implements ClusterCompositionResponse
42+
{
43+
private final ClusterComposition cluster;
44+
45+
public Success( ClusterComposition cluster )
46+
{
47+
this.cluster = cluster;
48+
}
49+
50+
@Override
51+
public ClusterComposition clusterComposition()
52+
{
53+
return cluster;
54+
}
55+
}
56+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public boolean isStale()
6666
@Override
6767
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
6868
{
69-
expirationTimeout = cluster.expirationTimestamp;
69+
expirationTimeout = cluster.expirationTimestamp();
7070
HashSet<BoltServerAddress> removed = new HashSet<>();
7171
readers.update( cluster.readers(), removed );
7272
writers.update( cluster.writers(), removed );
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.List;
22+
23+
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.util.Clock;
25+
import org.neo4j.driver.v1.Logger;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.exceptions.ClientException;
28+
import org.neo4j.driver.v1.exceptions.ProtocolException;
29+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
30+
import org.neo4j.driver.v1.exceptions.value.ValueException;
31+
32+
import static java.lang.String.format;
33+
34+
public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
35+
{
36+
private final String GET_SERVERS = "dbms.cluster.routing.getServers";
37+
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS +
38+
"' result received from server due to ";
39+
40+
private final Clock clock;
41+
private final Logger log;
42+
private final GetServersProcedureRunner getServersRunner;
43+
44+
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
45+
{
46+
this( clock, log, new GetServersProcedureRunner() );
47+
}
48+
49+
GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
50+
{
51+
this.clock = clock;
52+
this.log = log;
53+
this.getServersRunner = getServersRunner;
54+
}
55+
56+
@Override
57+
public ClusterCompositionResponse getClusterComposition( Connection connection )
58+
{
59+
List<Record> records = null;
60+
61+
// failed to invoke procedure
62+
try
63+
{
64+
records = getServersRunner.run( connection );
65+
}
66+
catch ( ClientException e )
67+
{
68+
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
69+
"Failed to call '%s' procedure on server. " +
70+
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
71+
}
72+
73+
log.info( "Got getServers response: %s", records );
74+
long now = clock.millis();
75+
76+
// the record size is wrong
77+
if ( records.size() != 1 )
78+
{
79+
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
80+
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
81+
records.size() ) ) );
82+
}
83+
84+
// failed to parse the record
85+
ClusterComposition cluster;
86+
try
87+
{
88+
cluster = ClusterComposition.parse( records.get( 0 ), now );
89+
}
90+
catch ( ValueException e )
91+
{
92+
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
93+
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
94+
}
95+
96+
// the cluster result is not a legal reply
97+
if ( cluster.hasRoutersAndReaders() )
98+
{
99+
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
100+
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
101+
}
102+
103+
// all good
104+
return new ClusterCompositionResponse.Success( cluster );
105+
}
106+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.neo4j.driver.internal.cluster;
21+
22+
import java.util.List;
23+
24+
import org.neo4j.driver.internal.NetworkSession;
25+
import org.neo4j.driver.internal.spi.Connection;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.Statement;
28+
29+
public class GetServersProcedureRunner
30+
{
31+
private final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
32+
33+
public List<Record> run( Connection connection )
34+
{
35+
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list();
36+
}
37+
}

0 commit comments

Comments
 (0)