Skip to content

Add support of passing routing parameters via driver config #334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,16 @@

public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
{
private final String GET_SERVERS = "dbms.cluster.routing.getServers";
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS +
"' result received from server due to ";

private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse `%s' result received from server due to ";

private final Clock clock;
private final Logger log;
private final GetServersProcedureRunner getServersRunner;

public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
{
this( clock, log, new GetServersProcedureRunner() );
this( clock, log, new GetServersProcedureRunner( settings.routingParameters ) );
}

GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
Expand All @@ -66,8 +65,10 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
catch ( ClientException e )
{
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
"Failed to call '%s' procedure on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
"Failed to run '%s' on server. " +
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
getServersRunner.procedureCalled() ), e
) );
}

log.info( "Got getServers response: %s", records );
Expand All @@ -77,7 +78,8 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
if ( records.size() != 1 )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
PROTOCOL_ERROR_MESSAGE +
"records received '%s' is too few or too many.", getServersRunner.procedureCalled(),
records.size() ) ) );
}

Expand All @@ -90,14 +92,16 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
catch ( ValueException e )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
PROTOCOL_ERROR_MESSAGE +
"unparsable record received.", getServersRunner.procedureCalled() ), e ) );
}

// the cluster result is not a legal reply
if ( !cluster.hasRoutersAndReaders() )
{
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
PROTOCOL_ERROR_MESSAGE +
"no router or reader found in response.", getServersRunner.procedureCalled() ) ) );
}

// all good
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,49 @@
import java.util.List;

import org.neo4j.driver.internal.NetworkSession;
import org.neo4j.driver.internal.SessionResourcesHandler;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.v1.Record;
import org.neo4j.driver.v1.Statement;
import org.neo4j.driver.v1.Value;

import static org.neo4j.driver.internal.SessionResourcesHandler.NO_OP;
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
import static org.neo4j.driver.internal.util.ServerVersion.version;

public class GetServersProcedureRunner
{
private static final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable";

private final Value routingContext;
private Statement procedureCalled;

public GetServersProcedureRunner( Value context )
{
this.routingContext = context;
}

public List<Record> run( Connection connection )
{
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ), SessionResourcesHandler.NO_OP ).list();
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
{
procedureCalled = new Statement( "CALL " + GET_ROUTING_TABLE, routingContext );
}
else
{
procedureCalled = new Statement("CALL " + GET_SERVERS );
}

return runProcedure( connection, procedureCalled );
}

List<Record> runProcedure( Connection connection, Statement procedure )
{
return NetworkSession.run( connection, procedure, NO_OP ).list();
}

Statement procedureCalled()
{
return procedureCalled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
Clock clock, Logger log )
{
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
ClusterCompositionProvider clusterComposition =
new GetServersProcedureClusterCompositionProvider( clock, log, settings );
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Map;

import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.Values;

public class RoutingSettings
{
final int maxRoutingFailures;
final long retryTimeoutDelay;
final Value routingParameters;

public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay )
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, Map<String, String> routingParameters )
{
this.maxRoutingFailures = maxRoutingFailures;
this.retryTimeoutDelay = retryTimeoutDelay;
this.routingParameters = Values.value( routingParameters );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class InternalServerInfo implements ServerInfo
private final BoltServerAddress address;
private final String version;

public InternalServerInfo(BoltServerAddress address, String version)
public InternalServerInfo( BoltServerAddress address, String version )
{
this.address = address;
this.version = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.v1.util;
package org.neo4j.driver.internal.util;

import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down
23 changes: 22 additions & 1 deletion driver/src/main/java/org/neo4j/driver/v1/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.v1;

import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

Expand Down Expand Up @@ -74,6 +75,8 @@ public class Config
private final int connectionTimeoutMillis;
private final RetrySettings retrySettings;

private final Map<String, String> routingContext;

private Config( ConfigBuilder builder)
{
this.logging = builder.logging;
Expand All @@ -88,6 +91,8 @@ private Config( ConfigBuilder builder)
this.routingRetryDelayMillis = builder.routingRetryDelayMillis;
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
this.retrySettings = builder.retrySettings;

this.routingContext = builder.routingContext;
}

/**
Expand Down Expand Up @@ -182,7 +187,7 @@ public static Config defaultConfig()

RoutingSettings routingSettings()
{
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis );
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis, routingContext );
}

RetrySettings retrySettings()
Expand All @@ -205,6 +210,7 @@ public static class ConfigBuilder
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
private RetrySettings retrySettings = RetrySettings.DEFAULT;
private Map<String,String> routingContext = null;

private ConfigBuilder() {}

Expand Down Expand Up @@ -473,6 +479,21 @@ public ConfigBuilder withMaxTransactionRetryTime( long value, TimeUnit unit )
return this;
}

/**
* Specify routing context that would be passed to server in getRoutingTable Procedure call for customized
* routing table reply.
* This parameter is only valid for the routing driver, a.k.a. the driver created use bolt+routing in URI
* scheme with 3.2+ Neo4j Casual Cluster servers.
* @param context The routing context to pass to getRoutingTable Procedure
* @since 1.3
* @return this builder
*/
public ConfigBuilder withRoutingContext( Map<String, String> context )
{
this.routingContext = context;
return this;
}

/**
* Create a config instance from this builder.
* @return a {@link Config} instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private Driver createDriver( DriverFactory driverFactory )
private Driver createDriver( DriverFactory driverFactory, Config config )
{
AuthToken auth = AuthTokens.none();
RoutingSettings routingSettings = new RoutingSettings( 42, 42 );
RoutingSettings routingSettings = new RoutingSettings( 42, 42, null );
return driverFactory.newInstance( uri, auth, routingSettings, RetrySettings.DEFAULT, config );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,7 @@ private static Driver newDriverWithFixedRetries( String uriString, int retries )
private static Driver newDriver( String uriString, DriverFactory driverFactory )
{
URI uri = URI.create( uriString );
RoutingSettings routingConf = new RoutingSettings( 1, 1 );
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
AuthToken auth = AuthTokens.none();
return driverFactory.newInstance( uri, auth, routingConf, RetrySettings.DEFAULT, config );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.ConnectionProvider;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.internal.summary.InternalServerInfo;
import org.neo4j.driver.internal.util.FakeClock;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Config;
Expand Down Expand Up @@ -148,8 +149,8 @@ public void shouldFailIfNoRouting()
// Then
catch ( ServiceUnavailableException e )
{
assertThat( e.getMessage(),
containsString( "Failed to call 'dbms.cluster.routing.getServers' procedure on server" ) );
assertThat( e.getMessage(), containsString( "Failed to run " +
"'Statement{text='CALL dbms.cluster.routing.getServers', parameters={}}' on server." ) );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho user-facing message with Statement{ does not look pretty. Could we maybe concatenate a prettier message in GetServersProcedureClusterCompositionProvider instead of using default Statement#toString()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's improve it in another PR, I will merge this PR once it is green.

}
}

Expand Down Expand Up @@ -344,7 +345,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn

private Driver driverWithPool( ConnectionPool pool )
{
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
RoutingSettings settings = new RoutingSettings( 10, 5_000, null );
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
Config config = Config.build().withLogging( logging ).toConfig();
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
Expand Down Expand Up @@ -383,6 +384,7 @@ public PooledConnection answer( InvocationOnMock invocationOnMock ) throws Throw
PooledConnection connection = mock( PooledConnection.class );
when( connection.isOpen() ).thenReturn( true );
when( connection.boltServerAddress() ).thenReturn( address );
when( connection.server() ).thenReturn( new InternalServerInfo( address, "Neo4j/3.1.0" ) );
doAnswer( withKeys( "ttl", "servers" ) ).when( connection ).run(
eq( GET_SERVERS ),
eq( Collections.<String,Value>emptyMap() ),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Collector;
import org.neo4j.driver.internal.spi.PooledConnection;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.value.StringValue;
Expand All @@ -42,8 +41,6 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -216,7 +213,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
// Given
GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class );
ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ),
DEV_NULL_LOGGER );
DEV_NULL_LOGGER, mockedRunner );

PooledConnection mockedConn = mock( PooledConnection.class );
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
Expand All @@ -225,7 +222,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
} );
doThrow( new ServiceUnavailableException( "Connection breaks during cypher execution" ) )
.when( mockedConn ).run( any( String.class ), anyMap(), any( Collector.class ) );
.when( mockedRunner ).run( mockedConn );

// When & Then
try
Expand Down
Loading