Skip to content

Update routing according to specification #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,15 @@
package org.neo4j.driver.internal.cluster;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.StatementResult;
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.value.ValueException;
import org.neo4j.driver.v1.util.Function;

final class ClusterComposition
{
interface Provider
{
String GET_SERVERS = "CALL dbms.cluster.routing.getServers";

ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException;

final class Default implements Provider
{
private static final Statement GET_SERVER = new Statement( Provider.GET_SERVERS );
private final Clock clock;
private final Logger log;

Default( Clock clock, Logger log )
{
this.clock = clock;
this.log = log;
}

@Override
public ClusterComposition getClusterComposition( Connection connection ) throws ServiceUnavailableException
{
StatementResult cursor = getServers( connection );
List<Record> records = cursor.list();
log.info( "Got getServers response: %s", records );
long now = clock.millis();
try
{
if ( records.size() != 1 )
{
// server returned too few or too many rows, this is a contract violation, treat as incapable
return null;
}
return read( records.get( 0 ), now );
}
finally
{
cursor.consume(); // make sure we exhaust the results
}
}

private StatementResult getServers( Connection connection )
{
return NetworkSession.run( connection, GET_SERVER );
}
}
}

private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
new Function<Value,BoltServerAddress>()
Expand All @@ -94,8 +38,11 @@ public BoltServerAddress apply( Value value )
return new BoltServerAddress( value.asString() );
}
};
private final Set<BoltServerAddress> readers, writers, routers;
final long expirationTimestamp;

private final Set<BoltServerAddress> readers;
private final Set<BoltServerAddress> writers;
private final Set<BoltServerAddress> routers;
private final long expirationTimestamp;

private ClusterComposition( long expirationTimestamp )
{
Expand All @@ -120,7 +67,11 @@ private ClusterComposition( long expirationTimestamp )

public boolean isValid()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can name this method #hasWriters() to be less ambiguous?

{
return !routers.isEmpty() && !writers.isEmpty();
return !writers.isEmpty();
}
public boolean isIllegalResponse()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote for #hasRoutersAndReaders() name to be more explicit

{
return routers.isEmpty() || readers.isEmpty();
}

public Set<BoltServerAddress> readers()
Expand All @@ -138,6 +89,10 @@ public Set<BoltServerAddress> routers()
return new HashSet<>( routers );
}

public long expirationTimestamp() {
return this.expirationTimestamp;
}

@Override
public String toString()
{
Expand All @@ -149,32 +104,26 @@ public String toString()
'}';
}

private static ClusterComposition read( Record record, long now )
public static ClusterComposition parse( Record record, long now )
{
if ( record == null )
{
return null;
}
try

final ClusterComposition result;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declaration can be joined with the assignment

result = new ClusterComposition( expirationTimestamp( now, record ) );
record.get( "servers" ).asList( new Function<Value,Void>()
{
final ClusterComposition result;
result = new ClusterComposition( expirationTimestamp( now, record ) );
record.get( "servers" ).asList( new Function<Value,Void>()
@Override
public Void apply( Value value )
{
@Override
public Void apply( Value value )
{
result.servers( value.get( "role" ).asString() )
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
return null;
}
} );
return result;
}
catch ( ValueException e )
{
return null;
}
result.servers( value.get( "role" ).asString() )
.addAll( value.get( "addresses" ).asList( OF_BoltServerAddress ) );
return null;
}
} );
return result;
}

private static long expirationTimestamp( long now, Record record )
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.List;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.value.ValueException;

import static java.lang.String.format;

public interface ClusterCompositionProvider
{
GetClusterCompositionResponse getClusterComposition( Connection connection );

class Default implements ClusterCompositionProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this implementation to a separate file, I think it deserves this :)

{
private String GET_SERVERS = "dbms.cluster.routing.getServers";
private String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server " +
"due to ";

private final Clock clock;
private final Logger log;
private final GetServersRunner getServersRunner;

public Default( Clock clock, Logger log )
{
this( clock, log, new GetServersRunner() );
}

Default( Clock clock, Logger log, GetServersRunner getServersRunner )
{
this.clock = clock;
this.log = log;
this.getServersRunner = getServersRunner;
}

@Override
public GetClusterCompositionResponse getClusterComposition( Connection connection )
{
List<Record> records = null;

// failed to invoke procedure
try
{
records = getServersRunner.run( connection );
}
catch ( ClientException e )
{
return new GetClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
"Failed to call '%s' procedure on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
}

log.info( "Got getServers response: %s", records );
long now = clock.millis();

// the record size is wrong
if ( records.size() != 1 )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
records.size() ) ) );
}

// failed to parse the record
ClusterComposition cluster;
try
{
cluster = ClusterComposition.parse( records.get( 0 ), now );
}
catch ( ValueException e )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
}

// the cluster result is not a legal reply
if ( cluster.isIllegalResponse() )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
}

// all good
return new GetClusterCompositionResponse.Success( cluster );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public boolean isStale()
@Override
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
{
expirationTimeout = cluster.expirationTimestamp;
expirationTimeout = cluster.expirationTimestamp();
HashSet<BoltServerAddress> removed = new HashSet<>();
readers.update( cluster.readers(), removed );
writers.update( cluster.writers(), removed );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

public interface GetClusterCompositionResponse
{
ClusterComposition clusterComposition();

class Failure implements GetClusterCompositionResponse
{
private final RuntimeException error;

public Failure( RuntimeException t )
{
this.error = t;
}

@Override
public ClusterComposition clusterComposition()
{
throw this.error;
}
}

class Success implements GetClusterCompositionResponse
{
private final ClusterComposition cluster;

public Success( ClusterComposition cluster )
{
this.cluster = cluster;
}

@Override
public ClusterComposition clusterComposition()
{
return cluster;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.neo4j.driver.internal.cluster;

import java.util.List;

import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;

public class GetServersRunner
{
private String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";

public List<Record> run( Connection connection )
{
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ) ).list();
}
}
Loading