-
Notifications
You must be signed in to change notification settings - Fork 155
Update routing according to specification #314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,71 +19,15 @@ | |
package org.neo4j.driver.internal.cluster; | ||
|
||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
import org.neo4j.driver.internal.NetworkSession; | ||
import org.neo4j.driver.internal.net.BoltServerAddress; | ||
import org.neo4j.driver.internal.spi.Connection; | ||
import org.neo4j.driver.internal.util.Clock; | ||
import org.neo4j.driver.v1.Logger; | ||
import org.neo4j.driver.v1.Record; | ||
import org.neo4j.driver.v1.Statement; | ||
import org.neo4j.driver.v1.StatementResult; | ||
import org.neo4j.driver.v1.Value; | ||
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; | ||
import org.neo4j.driver.v1.exceptions.value.ValueException; | ||
import org.neo4j.driver.v1.util.Function; | ||
|
||
final class ClusterComposition | ||
{ | ||
interface Provider | ||
{ | ||
String GET_SERVERS = "CALL dbms.cluster.routing.getServers"; | ||
|
||
ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException; | ||
|
||
final class Default implements Provider | ||
{ | ||
private static final Statement GET_SERVER = new Statement( Provider.GET_SERVERS ); | ||
private final Clock clock; | ||
private final Logger log; | ||
|
||
Default( Clock clock, Logger log ) | ||
{ | ||
this.clock = clock; | ||
this.log = log; | ||
} | ||
|
||
@Override | ||
public ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException | ||
{ | ||
StatementResult cursor = getServers( connection ); | ||
List<Record> records = cursor.list(); | ||
log.info( "Got getServers response: %s", records ); | ||
long now = clock.millis(); | ||
try | ||
{ | ||
if ( records.size() != 1 ) | ||
{ | ||
// server returned too few or too many rows, this is a contract violation, treat as incapable | ||
return null; | ||
} | ||
return read( records.get( 0 ), now ); | ||
} | ||
finally | ||
{ | ||
cursor.consume(); // make sure we exhaust the results | ||
} | ||
} | ||
|
||
private StatementResult getServers( Connection connection ) | ||
{ | ||
return NetworkSession.run( connection, GET_SERVER ); | ||
} | ||
} | ||
} | ||
|
||
private static final long MAX_TTL = Long.MAX_VALUE / 1000L; | ||
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress = | ||
new Function<Value,BoltServerAddress>() | ||
|
@@ -94,8 +38,11 @@ public BoltServerAddress apply( Value value ) | |
return new BoltServerAddress( value.asString() ); | ||
} | ||
}; | ||
private final Set<BoltServerAddress> readers, writers, routers; | ||
final long expirationTimestamp; | ||
|
||
private final Set<BoltServerAddress> readers; | ||
private final Set<BoltServerAddress> writers; | ||
private final Set<BoltServerAddress> routers; | ||
private final long expirationTimestamp; | ||
|
||
private ClusterComposition( long expirationTimestamp ) | ||
{ | ||
|
@@ -120,7 +67,11 @@ private ClusterComposition( long expirationTimestamp ) | |
|
||
public boolean isValid() | ||
{ | ||
return !routers.isEmpty() && !writers.isEmpty(); | ||
return !writers.isEmpty(); | ||
} | ||
public boolean isIllegalResponse() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd vote for |
||
{ | ||
return routers.isEmpty() || readers.isEmpty(); | ||
} | ||
|
||
public Set<BoltServerAddress> readers() | ||
|
@@ -138,6 +89,10 @@ public Set<BoltServerAddress> routers() | |
return new HashSet<>( routers ); | ||
} | ||
|
||
public long expirationTimestamp() { | ||
return this.expirationTimestamp; | ||
} | ||
|
||
@Override | ||
public String toString() | ||
{ | ||
|
@@ -149,32 +104,26 @@ public String toString() | |
'}'; | ||
} | ||
|
||
private static ClusterComposition read( Record record, long now ) | ||
public static ClusterComposition parse( Record record, long now ) | ||
{ | ||
if ( record == null ) | ||
{ | ||
return null; | ||
} | ||
try | ||
|
||
final ClusterComposition result; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Declaration can be joined with the assignment |
||
result = new ClusterComposition( expirationTimestamp( now, record ) ); | ||
record.get( "servers" ).asList( new Function<Value,Void>() | ||
{ | ||
final ClusterComposition result; | ||
result = new ClusterComposition( expirationTimestamp( now, record ) ); | ||
record.get( "servers" ).asList( new Function<Value,Void>() | ||
@Override | ||
public Void apply( Value value ) | ||
{ | ||
@Override | ||
public Void apply( Value value ) | ||
{ | ||
result.servers( value.get( "role" ).asString() ) | ||
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) ); | ||
return null; | ||
} | ||
} ); | ||
return result; | ||
} | ||
catch ( ValueException e ) | ||
{ | ||
return null; | ||
} | ||
result.servers( value.get( "role" ).asString() ) | ||
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) ); | ||
return null; | ||
} | ||
} ); | ||
return result; | ||
} | ||
|
||
private static long expirationTimestamp( long now, Record record ) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
/* | ||
* Copyright (c) 2002-2017 "Neo Technology," | ||
* Network Engine for Objects in Lund AB [http://neotechnology.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.neo4j.driver.internal.cluster; | ||
|
||
import java.util.List; | ||
|
||
import org.neo4j.driver.internal.spi.Connection; | ||
import org.neo4j.driver.internal.util.Clock; | ||
import org.neo4j.driver.v1.Logger; | ||
import org.neo4j.driver.v1.Record; | ||
import org.neo4j.driver.v1.exceptions.ClientException; | ||
import org.neo4j.driver.v1.exceptions.ProtocolException; | ||
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; | ||
import org.neo4j.driver.v1.exceptions.value.ValueException; | ||
|
||
import static java.lang.String.format; | ||
|
||
public interface ClusterCompositionProvider | ||
{ | ||
GetClusterCompositionResponse getClusterComposition( Connection connection ); | ||
|
||
class Default implements ClusterCompositionProvider | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's move this implementation to a separate file, I think it deserves this :) |
||
{ | ||
private String GET_SERVERS = "dbms.cluster.routing.getServers"; | ||
private String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server " + | ||
"due to "; | ||
|
||
private final Clock clock; | ||
private final Logger log; | ||
private final GetServersRunner getServersRunner; | ||
|
||
public Default( Clock clock, Logger log ) | ||
{ | ||
this( clock, log, new GetServersRunner() ); | ||
} | ||
|
||
Default( Clock clock, Logger log, GetServersRunner getServersRunner ) | ||
{ | ||
this.clock = clock; | ||
this.log = log; | ||
this.getServersRunner = getServersRunner; | ||
} | ||
|
||
@Override | ||
public GetClusterCompositionResponse getClusterComposition( Connection connection ) | ||
{ | ||
List<Record> records = null; | ||
|
||
// failed to invoke procedure | ||
try | ||
{ | ||
records = getServersRunner.run( connection ); | ||
} | ||
catch ( ClientException e ) | ||
{ | ||
return new GetClusterCompositionResponse.Failure( new ServiceUnavailableException( format( | ||
"Failed to call '%s' procedure on server. " + | ||
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) ); | ||
} | ||
|
||
log.info( "Got getServers response: %s", records ); | ||
long now = clock.millis(); | ||
|
||
// the record size is wrong | ||
if ( records.size() != 1 ) | ||
{ | ||
return new GetClusterCompositionResponse.Failure( new ProtocolException( format( | ||
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE, | ||
records.size() ) ) ); | ||
} | ||
|
||
// failed to parse the record | ||
ClusterComposition cluster; | ||
try | ||
{ | ||
cluster = ClusterComposition.parse( records.get( 0 ), now ); | ||
} | ||
catch ( ValueException e ) | ||
{ | ||
return new GetClusterCompositionResponse.Failure( new ProtocolException( format( | ||
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) ); | ||
} | ||
|
||
// the cluster result is not a legal reply | ||
if ( cluster.isIllegalResponse() ) | ||
{ | ||
return new GetClusterCompositionResponse.Failure( new ProtocolException( format( | ||
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) ); | ||
} | ||
|
||
// all good | ||
return new GetClusterCompositionResponse.Success( cluster ); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Copyright (c) 2002-2017 "Neo Technology," | ||
* Network Engine for Objects in Lund AB [http://neotechnology.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.neo4j.driver.internal.cluster; | ||
|
||
public interface GetClusterCompositionResponse | ||
{ | ||
ClusterComposition clusterComposition(); | ||
|
||
class Failure implements GetClusterCompositionResponse | ||
{ | ||
private final RuntimeException error; | ||
|
||
public Failure( RuntimeException t ) | ||
{ | ||
this.error = t; | ||
} | ||
|
||
@Override | ||
public ClusterComposition clusterComposition() | ||
{ | ||
throw this.error; | ||
} | ||
} | ||
|
||
class Success implements GetClusterCompositionResponse | ||
{ | ||
private final ClusterComposition cluster; | ||
|
||
public Success( ClusterComposition cluster ) | ||
{ | ||
this.cluster = cluster; | ||
} | ||
|
||
@Override | ||
public ClusterComposition clusterComposition() | ||
{ | ||
return cluster; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (c) 2002-2017 "Neo Technology," | ||
* Network Engine for Objects in Lund AB [http://neotechnology.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.neo4j.driver.internal.cluster; | ||
|
||
import java.util.List; | ||
|
||
import org.neo4j.driver.internal.NetworkSession; | ||
import org.neo4j.driver.internal.spi.Connection; | ||
import org.neo4j.driver.v1.Record; | ||
import org.neo4j.driver.v1.Statement; | ||
|
||
public class GetServersRunner | ||
{ | ||
private String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers"; | ||
|
||
public List<Record> run( Connection connection ) | ||
{ | ||
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can name this method
#hasWriters()
to be less ambiguous?