Skip to content

Commit 34c5542

Browse files
author
Zhen
committed
Add support of passing routing parameters via driver config
1 parent 447fe81 commit 34c5542

19 files changed

+219
-41
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,16 @@
3333

3434
public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
3535
{
36-
private final String GET_SERVERS = "dbms.cluster.routing.getServers";
37-
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS +
38-
"' result received from server due to ";
36+
37+
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse `%s' result received from server due to ";
3938

4039
private final Clock clock;
4140
private final Logger log;
4241
private final GetServersProcedureRunner getServersRunner;
4342

44-
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
43+
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
4544
{
46-
this( clock, log, new GetServersProcedureRunner() );
45+
this( clock, log, new GetServersProcedureRunner( settings.routingParameters ) );
4746
}
4847

4948
GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
@@ -66,8 +65,10 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
6665
catch ( ClientException e )
6766
{
6867
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
69-
"Failed to call '%s' procedure on server. " +
70-
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
68+
"Failed to run '%s' on server. " +
69+
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
70+
getServersRunner.procedureCalled() ), e
71+
) );
7172
}
7273

7374
log.info( "Got getServers response: %s", records );
@@ -77,7 +78,8 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
7778
if ( records.size() != 1 )
7879
{
7980
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
80-
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
81+
PROTOCOL_ERROR_MESSAGE +
82+
"records received '%s' is too few or too many.", getServersRunner.procedureCalled(),
8183
records.size() ) ) );
8284
}
8385

@@ -90,14 +92,16 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
9092
catch ( ValueException e )
9193
{
9294
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
93-
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
95+
PROTOCOL_ERROR_MESSAGE +
96+
"unparsable record received.", getServersRunner.procedureCalled() ), e ) );
9497
}
9598

9699
// the cluster result is not a legal reply
97100
if ( !cluster.hasRoutersAndReaders() )
98101
{
99102
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
100-
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
103+
PROTOCOL_ERROR_MESSAGE +
104+
"no router or reader found in response.", getServersRunner.procedureCalled() ) ) );
101105
}
102106

103107
// all good

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,49 @@
2222
import java.util.List;
2323

2424
import org.neo4j.driver.internal.NetworkSession;
25-
import org.neo4j.driver.internal.SessionResourcesHandler;
2625
import org.neo4j.driver.internal.spi.Connection;
2726
import org.neo4j.driver.v1.Record;
2827
import org.neo4j.driver.v1.Statement;
28+
import org.neo4j.driver.v1.Value;
29+
30+
import static org.neo4j.driver.internal.SessionResourcesHandler.NO_OP;
31+
import static org.neo4j.driver.internal.cluster.ServerVersion.v3_2_0;
32+
import static org.neo4j.driver.internal.cluster.ServerVersion.version;
2933

3034
public class GetServersProcedureRunner
3135
{
32-
private static final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
36+
private static final String GET_SERVERS = "dbms.cluster.routing.getServers";
37+
private static final String GET_SERVERS_V2 = "dbms.cluster.routing.getServersV2";
38+
39+
private final Value routingParameters;
40+
private Statement procedureCalled;
41+
42+
public GetServersProcedureRunner( Value parameters )
43+
{
44+
this.routingParameters = parameters;
45+
}
3346

3447
public List<Record> run( Connection connection )
3548
{
36-
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ), SessionResourcesHandler.NO_OP ).list();
49+
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
50+
{
51+
procedureCalled = new Statement( "CALL " + GET_SERVERS_V2, routingParameters );
52+
}
53+
else
54+
{
55+
procedureCalled = new Statement("CALL " + GET_SERVERS );
56+
}
57+
58+
return runProcedure( connection, procedureCalled );
59+
}
60+
61+
List<Record> runProcedure( Connection connection, Statement procedure )
62+
{
63+
return NetworkSession.run( connection, procedure, NO_OP ).list();
64+
}
65+
66+
Statement procedureCalled()
67+
{
68+
return procedureCalled;
3769
}
3870
}

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
156156
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
157157
Clock clock, Logger log )
158158
{
159-
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
159+
ClusterCompositionProvider clusterComposition =
160+
new GetServersProcedureClusterCompositionProvider( clock, log, settings );
160161
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
161162
}
162163
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,21 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Map;
22+
23+
import org.neo4j.driver.v1.Value;
24+
import org.neo4j.driver.v1.Values;
25+
2126
public class RoutingSettings
2227
{
2328
final int maxRoutingFailures;
2429
final long retryTimeoutDelay;
30+
final Value routingParameters;
2531

26-
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay )
32+
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, Map<String, String> routingParameters )
2733
{
2834
this.maxRoutingFailures = maxRoutingFailures;
2935
this.retryTimeoutDelay = retryTimeoutDelay;
36+
this.routingParameters = Values.value( routingParameters );
3037
}
3138
}

driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java renamed to driver/src/main/java/org/neo4j/driver/internal/cluster/ServerVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.v1.util;
19+
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.regex.Matcher;
2222
import java.util.regex.Pattern;
@@ -44,6 +44,7 @@ private ServerVersion( int major, int minor, int patch )
4444
public static final ServerVersion v3_2_0 = new ServerVersion(3, 2, 0);
4545
public static final ServerVersion v3_1_0 = new ServerVersion(3, 1, 0);
4646
public static final ServerVersion v3_0_0 = new ServerVersion(3, 0, 0);
47+
public static final ServerVersion v3_2_0 = new ServerVersion(3, 2, 0);
4748

4849
public static ServerVersion version( Driver driver )
4950
{

driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class InternalServerInfo implements ServerInfo
2626
private final BoltServerAddress address;
2727
private final String version;
2828

29-
public InternalServerInfo(BoltServerAddress address, String version)
29+
public InternalServerInfo( BoltServerAddress address, String version )
3030
{
3131
this.address = address;
3232
this.version = version;

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.v1;
2020

2121
import java.io.File;
22+
import java.util.Map;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.logging.Level;
2425

@@ -74,6 +75,8 @@ public class Config
7475
private final int connectionTimeoutMillis;
7576
private final RetrySettings retrySettings;
7677

78+
private final Map<String, String> routingParameters;
79+
7780
private Config( ConfigBuilder builder)
7881
{
7982
this.logging = builder.logging;
@@ -88,6 +91,8 @@ private Config( ConfigBuilder builder)
8891
this.routingRetryDelayMillis = builder.routingRetryDelayMillis;
8992
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
9093
this.retrySettings = builder.retrySettings;
94+
95+
this.routingParameters = builder.routingParameters;
9196
}
9297

9398
/**
@@ -182,7 +187,7 @@ public static Config defaultConfig()
182187

183188
RoutingSettings routingSettings()
184189
{
185-
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis );
190+
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis, routingParameters );
186191
}
187192

188193
RetrySettings retrySettings()
@@ -205,6 +210,7 @@ public static class ConfigBuilder
205210
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
206211
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
207212
private RetrySettings retrySettings = RetrySettings.DEFAULT;
213+
private Map<String,String> routingParameters = null;
208214

209215
private ConfigBuilder() {}
210216

@@ -473,6 +479,21 @@ public ConfigBuilder withMaxTransactionRetryTime( long value, TimeUnit unit )
473479
return this;
474480
}
475481

482+
/**
483+
* Specify routing parameters that would be passed to server in getServers Procedure call for customized
484+
* routing table reply.
485+
* This parameter is only valid for the routing driver, a.k.a. the driver created use bolt+routing in URI
486+
* scheme with 3.2+ Neo4j Casual Cluster servers.
487+
* @param parameters The parameters to pass to getServers Procedure
488+
* @since 1.3
489+
* @return this builder
490+
*/
491+
public ConfigBuilder withRoutingParameters( Map<String, String> parameters )
492+
{
493+
this.routingParameters = parameters;
494+
return this;
495+
}
496+
476497
/**
477498
* Create a config instance from this builder.
478499
* @return a {@link Config} instance

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private Driver createDriver( DriverFactory driverFactory )
139139
private Driver createDriver( DriverFactory driverFactory, Config config )
140140
{
141141
AuthToken auth = AuthTokens.none();
142-
RoutingSettings routingSettings = new RoutingSettings( 42, 42 );
142+
RoutingSettings routingSettings = new RoutingSettings( 42, 42, null );
143143
return driverFactory.newInstance( uri, auth, routingSettings, RetrySettings.DEFAULT, config );
144144
}
145145

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,7 @@ private static Driver newDriverWithFixedRetries( String uriString, int retries )
873873
private static Driver newDriver( String uriString, DriverFactory driverFactory )
874874
{
875875
URI uri = URI.create( uriString );
876-
RoutingSettings routingConf = new RoutingSettings( 1, 1 );
876+
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
877877
AuthToken auth = AuthTokens.none();
878878
return driverFactory.newInstance( uri, auth, routingConf, RetrySettings.DEFAULT, config );
879879
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.neo4j.driver.internal.spi.ConnectionPool;
3737
import org.neo4j.driver.internal.spi.ConnectionProvider;
3838
import org.neo4j.driver.internal.spi.PooledConnection;
39+
import org.neo4j.driver.internal.summary.InternalServerInfo;
3940
import org.neo4j.driver.internal.util.FakeClock;
4041
import org.neo4j.driver.v1.AccessMode;
4142
import org.neo4j.driver.v1.Config;
@@ -148,8 +149,8 @@ public void shouldFailIfNoRouting()
148149
// Then
149150
catch ( ServiceUnavailableException e )
150151
{
151-
assertThat( e.getMessage(),
152-
containsString( "Failed to call 'dbms.cluster.routing.getServers' procedure on server" ) );
152+
assertThat( e.getMessage(), containsString( "Failed to run " +
153+
"'Statement{text='CALL dbms.cluster.routing.getServers', parameters={}}' on server." ) );
153154
}
154155
}
155156

@@ -344,7 +345,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
344345

345346
private Driver driverWithPool( ConnectionPool pool )
346347
{
347-
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
348+
RoutingSettings settings = new RoutingSettings( 10, 5_000, null );
348349
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
349350
Config config = Config.build().withLogging( logging ).toConfig();
350351
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
@@ -383,6 +384,7 @@ public PooledConnection answer( InvocationOnMock invocationOnMock ) throws Throw
383384
PooledConnection connection = mock( PooledConnection.class );
384385
when( connection.isOpen() ).thenReturn( true );
385386
when( connection.boltServerAddress() ).thenReturn( address );
387+
when( connection.server() ).thenReturn( new InternalServerInfo( address, "Neo4j/3.1.0" ) );
386388
doAnswer( withKeys( "ttl", "servers" ) ).when( connection ).run(
387389
eq( GET_SERVERS ),
388390
eq( Collections.<String,Value>emptyMap() ),

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
import org.neo4j.driver.internal.InternalRecord;
2929
import org.neo4j.driver.internal.net.BoltServerAddress;
30-
import org.neo4j.driver.internal.spi.Collector;
3130
import org.neo4j.driver.internal.spi.PooledConnection;
3231
import org.neo4j.driver.internal.util.Clock;
3332
import org.neo4j.driver.internal.value.StringValue;
@@ -42,8 +41,6 @@
4241
import static org.hamcrest.Matchers.instanceOf;
4342
import static org.junit.Assert.assertEquals;
4443
import static org.junit.Assert.fail;
45-
import static org.mockito.Matchers.any;
46-
import static org.mockito.Matchers.anyMap;
4744
import static org.mockito.Mockito.doThrow;
4845
import static org.mockito.Mockito.mock;
4946
import static org.mockito.Mockito.when;
@@ -216,7 +213,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
216213
// Given
217214
GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class );
218215
ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ),
219-
DEV_NULL_LOGGER );
216+
DEV_NULL_LOGGER, mockedRunner );
220217

221218
PooledConnection mockedConn = mock( PooledConnection.class );
222219
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
@@ -225,7 +222,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
225222
serverInfo( "ROUTE", "one:1337", "two:1337" ) ) )
226223
} );
227224
doThrow( new ServiceUnavailableException( "Connection breaks during cypher execution" ) )
228-
.when( mockedConn ).run( any( String.class ), anyMap(), any( Collector.class ) );
225+
.when( mockedRunner ).run( mockedConn );
229226

230227
// When & Then
231228
try

0 commit comments

Comments
 (0)