Skip to content

Commit a332bdf

Browse files
author
Zhen
committed
Adding tests for routing specification and removed some old tests that no long needed after the change
1 parent 8ebb858 commit a332bdf

17 files changed

+891
-1695
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 22 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -19,100 +19,15 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.HashSet;
22-
import java.util.List;
2322
import java.util.Set;
2423

25-
import org.neo4j.driver.internal.NetworkSession;
2624
import org.neo4j.driver.internal.net.BoltServerAddress;
27-
import org.neo4j.driver.internal.spi.Connection;
28-
import org.neo4j.driver.internal.util.Clock;
29-
import org.neo4j.driver.v1.Logger;
3025
import org.neo4j.driver.v1.Record;
31-
import org.neo4j.driver.v1.Statement;
3226
import org.neo4j.driver.v1.Value;
33-
import org.neo4j.driver.v1.exceptions.ClientException;
34-
import org.neo4j.driver.v1.exceptions.ProtocolException;
35-
import org.neo4j.driver.v1.exceptions.value.ValueException;
3627
import org.neo4j.driver.v1.util.Function;
3728

38-
import static java.lang.String.format;
39-
import static org.neo4j.driver.internal.cluster.ClusterComposition.Provider.PROTOCOL_ERROR_MESSAGE;
40-
4129
final class ClusterComposition
4230
{
43-
interface Provider
44-
{
45-
String GET_SERVERS = "dbms.cluster.routing.getServers";
46-
String CALL_GET_SERVERS = "CALL " + GET_SERVERS;
47-
String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server.";
48-
49-
ClusterComposition getClusterComposition( Connection connection )
50-
throws ProtocolException, ProcedureNotFoundException;
51-
52-
class ProcedureNotFoundException extends Exception
53-
{
54-
ProcedureNotFoundException( String message )
55-
{
56-
super( message );
57-
}
58-
59-
ProcedureNotFoundException( String message, Throwable e )
60-
{
61-
super( message, e );
62-
}
63-
}
64-
65-
final class Default implements Provider
66-
{
67-
private static final Statement CALL_GET_SERVER = new Statement( Provider.CALL_GET_SERVERS );
68-
private final Clock clock;
69-
private final Logger log;
70-
71-
Default( Clock clock, Logger log )
72-
{
73-
this.clock = clock;
74-
this.log = log;
75-
}
76-
77-
@Override
78-
public ClusterComposition getClusterComposition( Connection connection )
79-
throws ProtocolException, ProcedureNotFoundException
80-
{
81-
List<Record> records = getServers( connection );
82-
log.info( "Got getServers response: %s", records );
83-
long now = clock.millis();
84-
85-
if ( records.size() != 1 )
86-
{
87-
throw new ProtocolException( format(
88-
"%s%nRecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
89-
records.size() ) );
90-
}
91-
ClusterComposition cluster = read( records.get( 0 ), now );
92-
if ( cluster.isIllegalResponse() )
93-
{
94-
throw new ProtocolException( format( "%s%nNo router or reader found in response.",
95-
PROTOCOL_ERROR_MESSAGE ) );
96-
}
97-
return cluster;
98-
}
99-
100-
private List<Record> getServers( Connection connection ) throws ProcedureNotFoundException
101-
{
102-
try
103-
{
104-
return NetworkSession.run( connection, CALL_GET_SERVER ).list();
105-
}
106-
catch ( ClientException e )
107-
{
108-
throw new ProcedureNotFoundException( format("Failed to call '%s' procedure on server. " +
109-
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
110-
GET_SERVERS ), e );
111-
}
112-
}
113-
}
114-
}
115-
11631
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
11732
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
11833
new Function<Value,BoltServerAddress>()
@@ -123,8 +38,11 @@ public BoltServerAddress apply( Value value )
12338
return new BoltServerAddress( value.asString() );
12439
}
12540
};
126-
private final Set<BoltServerAddress> readers, writers, routers;
127-
final long expirationTimestamp;
41+
42+
private final Set<BoltServerAddress> readers;
43+
private final Set<BoltServerAddress> writers;
44+
private final Set<BoltServerAddress> routers;
45+
private final long expirationTimestamp;
12846

12947
private ClusterComposition( long expirationTimestamp )
13048
{
@@ -171,6 +89,10 @@ public Set<BoltServerAddress> routers()
17189
return new HashSet<>( routers );
17290
}
17391

92+
public long expirationTimestamp() {
93+
return this.expirationTimestamp;
94+
}
95+
17496
@Override
17597
public String toString()
17698
{
@@ -182,32 +104,26 @@ public String toString()
182104
'}';
183105
}
184106

185-
private static ClusterComposition read( Record record, long now )
107+
public static ClusterComposition parse( Record record, long now )
186108
{
187109
if ( record == null )
188110
{
189111
return null;
190112
}
191-
try
113+
114+
final ClusterComposition result;
115+
result = new ClusterComposition( expirationTimestamp( now, record ) );
116+
record.get( "servers" ).asList( new Function<Value,Void>()
192117
{
193-
final ClusterComposition result;
194-
result = new ClusterComposition( expirationTimestamp( now, record ) );
195-
record.get( "servers" ).asList( new Function<Value,Void>()
118+
@Override
119+
public Void apply( Value value )
196120
{
197-
@Override
198-
public Void apply( Value value )
199-
{
200-
result.servers( value.get( "role" ).asString() )
201-
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
202-
return null;
203-
}
204-
} );
205-
return result;
206-
}
207-
catch ( ValueException e )
208-
{
209-
throw new ProtocolException( format( "%s%nUnparsable record received.", PROTOCOL_ERROR_MESSAGE ), e );
210-
}
121+
result.servers( value.get( "role" ).asString() )
122+
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
123+
return null;
124+
}
125+
} );
126+
return result;
211127
}
212128

213129
private static long expirationTimestamp( long now, Record record )
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.List;
22+
23+
import org.neo4j.driver.internal.spi.Connection;
24+
import org.neo4j.driver.internal.util.Clock;
25+
import org.neo4j.driver.v1.Logger;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.exceptions.ClientException;
28+
import org.neo4j.driver.v1.exceptions.ProtocolException;
29+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
30+
import org.neo4j.driver.v1.exceptions.value.ValueException;
31+
32+
import static java.lang.String.format;
33+
34+
public interface ClusterCompositionProvider
35+
{
36+
GetClusterCompositionResponse getClusterComposition( Connection connection );
37+
38+
class Default implements ClusterCompositionProvider
39+
{
40+
private String GET_SERVERS = "dbms.cluster.routing.getServers";
41+
private String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server " +
42+
"due to ";
43+
44+
private final Clock clock;
45+
private final Logger log;
46+
private final GetServersRunner getServersRunner;
47+
48+
public Default( Clock clock, Logger log )
49+
{
50+
this( clock, log, new GetServersRunner() );
51+
}
52+
53+
Default( Clock clock, Logger log, GetServersRunner getServersRunner )
54+
{
55+
this.clock = clock;
56+
this.log = log;
57+
this.getServersRunner = getServersRunner;
58+
}
59+
60+
@Override
61+
public GetClusterCompositionResponse getClusterComposition( Connection connection )
62+
{
63+
List<Record> records = null;
64+
65+
// failed to invoke procedure
66+
try
67+
{
68+
records = getServersRunner.run( connection );
69+
}
70+
catch ( ClientException e )
71+
{
72+
return new GetClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
73+
"Failed to call '%s' procedure on server. " +
74+
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
75+
}
76+
77+
log.info( "Got getServers response: %s", records );
78+
long now = clock.millis();
79+
80+
// the record size is wrong
81+
if ( records.size() != 1 )
82+
{
83+
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
84+
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
85+
records.size() ) ) );
86+
}
87+
88+
// failed to parse the record
89+
ClusterComposition cluster;
90+
try
91+
{
92+
cluster = ClusterComposition.parse( records.get( 0 ), now );
93+
}
94+
catch ( ValueException e )
95+
{
96+
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
97+
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
98+
}
99+
100+
// the cluster result is not a legal reply
101+
if ( cluster.isIllegalResponse() )
102+
{
103+
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
104+
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
105+
}
106+
107+
// all good
108+
return new GetClusterCompositionResponse.Success( cluster );
109+
}
110+
}
111+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public boolean isStale()
6666
@Override
6767
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
6868
{
69-
expirationTimeout = cluster.expirationTimestamp;
69+
expirationTimeout = cluster.expirationTimestamp();
7070
HashSet<BoltServerAddress> removed = new HashSet<>();
7171
readers.update( cluster.readers(), removed );
7272
writers.update( cluster.writers(), removed );
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
public interface GetClusterCompositionResponse
22+
{
23+
ClusterComposition clusterComposition();
24+
25+
class Failure implements GetClusterCompositionResponse
26+
{
27+
private final RuntimeException error;
28+
29+
public Failure( RuntimeException t )
30+
{
31+
this.error = t;
32+
}
33+
34+
@Override
35+
public ClusterComposition clusterComposition()
36+
{
37+
throw this.error;
38+
}
39+
}
40+
41+
class Success implements GetClusterCompositionResponse
42+
{
43+
private final ClusterComposition cluster;
44+
45+
public Success( ClusterComposition cluster )
46+
{
47+
this.cluster = cluster;
48+
}
49+
50+
@Override
51+
public ClusterComposition clusterComposition()
52+
{
53+
return cluster;
54+
}
55+
}
56+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.neo4j.driver.internal.cluster;
21+
22+
import java.util.List;
23+
24+
import org.neo4j.driver.internal.NetworkSession;
25+
import org.neo4j.driver.internal.spi.Connection;
26+
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.Statement;
28+
29+
public class GetServersRunner
30+
{
31+
private String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
32+
33+
public List<Record> run( Connection connection )
34+
{
35+
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list();
36+
}
37+
}

0 commit comments

Comments
 (0)