Skip to content

Update routing according to specification #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ private ClusterComposition( long expirationTimestamp )
this.routers.addAll( routers );
}

public boolean isValid()
public boolean hasWriters()
{
return !writers.isEmpty();
}
public boolean isIllegalResponse()
public boolean hasRoutersAndReaders()
{
return routers.isEmpty() || readers.isEmpty();
}
Expand Down Expand Up @@ -111,8 +111,7 @@ public static ClusterComposition parse( Record record, long now )
return null;
}

final ClusterComposition result;
result = new ClusterComposition( expirationTimestamp( now, record ) );
final ClusterComposition result = new ClusterComposition( expirationTimestamp( now, record ) );
record.get( "servers" ).asList( new Function<Value,Void>()
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,9 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.List;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.value.ValueException;

import static java.lang.String.format;

public interface ClusterCompositionProvider
{
GetClusterCompositionResponse getClusterComposition( Connection connection );

class Default implements ClusterCompositionProvider
{
private String GET_SERVERS = "dbms.cluster.routing.getServers";
private String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS + "' result received from server " +
"due to ";

private final Clock clock;
private final Logger log;
private final GetServersRunner getServersRunner;

public Default( Clock clock, Logger log )
{
this( clock, log, new GetServersRunner() );
}

Default( Clock clock, Logger log, GetServersRunner getServersRunner )
{
this.clock = clock;
this.log = log;
this.getServersRunner = getServersRunner;
}

@Override
public GetClusterCompositionResponse getClusterComposition( Connection connection )
{
List<Record> records = null;

// failed to invoke procedure
try
{
records = getServersRunner.run( connection );
}
catch ( ClientException e )
{
return new GetClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
"Failed to call '%s' procedure on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
}

log.info( "Got getServers response: %s", records );
long now = clock.millis();

// the record size is wrong
if ( records.size() != 1 )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
records.size() ) ) );
}

// failed to parse the record
ClusterComposition cluster;
try
{
cluster = ClusterComposition.parse( records.get( 0 ), now );
}
catch ( ValueException e )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
}

// the cluster result is not a legal reply
if ( cluster.isIllegalResponse() )
{
return new GetClusterCompositionResponse.Failure( new ProtocolException( format(
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
}

// all good
return new GetClusterCompositionResponse.Success( cluster );
}
}
ClusterCompositionResponse getClusterComposition( Connection connection );
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.neo4j.driver.internal.cluster;

public interface GetClusterCompositionResponse
public interface ClusterCompositionResponse
{
ClusterComposition clusterComposition();

class Failure implements GetClusterCompositionResponse
class Failure implements ClusterCompositionResponse
{
private final RuntimeException error;

Expand All @@ -38,7 +38,7 @@ public ClusterComposition clusterComposition()
}
}

class Success implements GetClusterCompositionResponse
class Success implements ClusterCompositionResponse
{
private final ClusterComposition cluster;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.List;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.value.ValueException;

import static java.lang.String.format;

public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about ClusterCompositionProviderImpl name for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this long name better, as it allows us to have other providers such as FileClusterCompositionProvider or GetServers2ProcedureClusterCompositionProvider

{
private final String GET_SERVERS = "dbms.cluster.routing.getServers";
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS +
"' result received from server due to ";

private final Clock clock;
private final Logger log;
private final GetServersProcedureRunner getServersRunner;

public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
{
this( clock, log, new GetServersProcedureRunner() );
}

GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
{
this.clock = clock;
this.log = log;
this.getServersRunner = getServersRunner;
}

@Override
public ClusterCompositionResponse getClusterComposition( Connection connection )
{
List<Record> records = null;

// failed to invoke procedure
try
{
records = getServersRunner.run( connection );
}
catch ( ClientException e )
{
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
"Failed to call '%s' procedure on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
}

log.info( "Got getServers response: %s", records );
long now = clock.millis();

// the record size is wrong
if ( records.size() != 1 )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
records.size() ) ) );
}

// failed to parse the record
ClusterComposition cluster;
try
{
cluster = ClusterComposition.parse( records.get( 0 ), now );
}
catch ( ValueException e )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
}

// the cluster result is not a legal reply
if ( cluster.hasRoutersAndReaders() )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
}

// all good
return new ClusterCompositionResponse.Success( cluster );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;

public class GetServersRunner
public class GetServersProcedureRunner
{
private String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
private final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";

public List<Record> run( Connection connection )
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public LoadBalancer(
BoltServerAddress... routingAddresses ) throws ServiceUnavailableException
{
this( settings, clock, log, connections, new ClusterRoutingTable( clock, routingAddresses ),
new ClusterCompositionProvider.Default( clock, log ) );
new GetServersProcedureClusterCompositionProvider( clock, log ) );
}

private LoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.exceptions.ProtocolException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;
Expand All @@ -48,9 +47,9 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
// cluster composition, which would be used to update the routing table and connection pool
public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable )
throws InterruptedException, ServiceUnavailableException, ProtocolException
throws InterruptedException
{
assertRouterIsNotEmpty( routingTable.routerSize() );
assertHasRouters( routingTable );
int failures = 0;

for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
Expand All @@ -62,15 +61,16 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
}
start = clock.millis();

for ( int i = 0, size = routingTable.routerSize(); i < size; i++ )
int size = routingTable.routerSize();
for ( int i = 0; i < size; i++ )
{
BoltServerAddress address = routingTable.nextRouter();
if ( address == null )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
}

GetClusterCompositionResponse response = null;
ClusterCompositionResponse response = null;
try ( Connection connection = connections.acquire( address ) )
{
response = provider.getClusterComposition( connection );
Expand All @@ -81,13 +81,13 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
routingTable.removeRouter( address );

assertRouterIsNotEmpty( routingTable.routerSize() );
assertHasRouters( routingTable );
continue;
}

ClusterComposition cluster = response.clusterComposition();
logger.info( "Got cluster composition %s", cluster );
if ( cluster.isValid() )
if ( cluster.hasWriters() )
{
return cluster;
}
Expand All @@ -99,9 +99,9 @@ public ClusterComposition lookupRoutingTable( ConnectionPool connections, Routin
}
}

private void assertRouterIsNotEmpty( int size )
private void assertHasRouters( RoutingTable table )
{
if ( size == 0 )
if ( table.routerSize() == 0 )
{
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
*/
public class ProtocolException extends Neo4jException
{
private static String CODE = "Protocol violation: ";
private static final String CODE = "Protocol violation: ";

public ProtocolException( String message )
{
super( CODE + message );
Expand Down
Loading