Skip to content

Commit 5aaba5a

Browse files
author
Zhen
committed
Add support of passing routing parameters via driver config
1 parent 5f0f9fa commit 5aaba5a

19 files changed

+211
-33
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureClusterCompositionProvider.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,16 @@
3333

3434
public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
3535
{
36-
private final String GET_SERVERS = "dbms.cluster.routing.getServers";
37-
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '" + GET_SERVERS +
38-
"' result received from server due to ";
36+
37+
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse `%s' result received from server due to ";
3938

4039
private final Clock clock;
4140
private final Logger log;
4241
private final GetServersProcedureRunner getServersRunner;
4342

44-
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log )
43+
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
4544
{
46-
this( clock, log, new GetServersProcedureRunner() );
45+
this( clock, log, new GetServersProcedureRunner( settings.routingParameters ) );
4746
}
4847

4948
GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
@@ -67,7 +66,9 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
6766
{
6867
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
6968
"Failed to call '%s' procedure on server. " +
70-
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.", GET_SERVERS ), e ) );
69+
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
70+
getServersRunner.procedureCalled() ), e
71+
) );
7172
}
7273

7374
log.info( "Got getServers response: %s", records );
@@ -77,7 +78,8 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
7778
if ( records.size() != 1 )
7879
{
7980
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
80-
"%srecords received '%s' is too few or too many.", PROTOCOL_ERROR_MESSAGE,
81+
PROTOCOL_ERROR_MESSAGE +
82+
"records received '%s' is too few or too many.", getServersRunner.procedureCalled(),
8183
records.size() ) ) );
8284
}
8385

@@ -90,14 +92,16 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
9092
catch ( ValueException e )
9193
{
9294
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
93-
"%sunparsable record received.", PROTOCOL_ERROR_MESSAGE ), e ) );
95+
PROTOCOL_ERROR_MESSAGE +
96+
"unparsable record received.", getServersRunner.procedureCalled() ), e ) );
9497
}
9598

9699
// the cluster result is not a legal reply
97100
if ( !cluster.hasRoutersAndReaders() )
98101
{
99102
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
100-
"%sno router or reader found in response.", PROTOCOL_ERROR_MESSAGE ) ) );
103+
PROTOCOL_ERROR_MESSAGE +
104+
"no router or reader found in response.", getServersRunner.procedureCalled() ) ) );
101105
}
102106

103107
// all good

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,49 @@
2222
import java.util.List;
2323

2424
import org.neo4j.driver.internal.NetworkSession;
25-
import org.neo4j.driver.internal.SessionResourcesHandler;
2625
import org.neo4j.driver.internal.spi.Connection;
2726
import org.neo4j.driver.v1.Record;
2827
import org.neo4j.driver.v1.Statement;
28+
import org.neo4j.driver.v1.Value;
29+
30+
import static org.neo4j.driver.internal.SessionResourcesHandler.NO_OP;
31+
import static org.neo4j.driver.internal.cluster.ServerVersion.v3_2_0;
32+
import static org.neo4j.driver.internal.cluster.ServerVersion.version;
2933

3034
public class GetServersProcedureRunner
3135
{
32-
private static final String CALL_GET_SERVERS = "CALL dbms.cluster.routing.getServers";
36+
private static final String GET_SERVERS = "dbms.cluster.routing.getServers";
37+
private static final String GET_SERVERS_V2 = "dbms.cluster.routing.getServersV2";
38+
39+
private final Value routingParameters;
40+
private Statement procedureCalled;
41+
42+
public GetServersProcedureRunner( Value parameters )
43+
{
44+
this.routingParameters = parameters;
45+
}
3346

3447
public List<Record> run( Connection connection )
3548
{
36-
return NetworkSession.run( connection, new Statement( CALL_GET_SERVERS ), SessionResourcesHandler.NO_OP ).list();
49+
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
50+
{
51+
procedureCalled = new Statement( "CALL " + GET_SERVERS_V2, routingParameters );
52+
}
53+
else
54+
{
55+
procedureCalled = new Statement("CALL " + GET_SERVERS );
56+
}
57+
58+
return runProcedure( connection, procedureCalled );
59+
}
60+
61+
List<Record> runProcedure( Connection connection, Statement procedure )
62+
{
63+
return NetworkSession.run( connection, procedure, NO_OP ).list();
64+
}
65+
66+
Statement procedureCalled()
67+
{
68+
return procedureCalled;
3769
}
3870
}

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
156156
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
157157
Clock clock, Logger log )
158158
{
159-
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
159+
ClusterCompositionProvider clusterComposition =
160+
new GetServersProcedureClusterCompositionProvider( clock, log, settings );
160161
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
161162
}
162163
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,21 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21+
import java.util.Map;
22+
23+
import org.neo4j.driver.v1.Value;
24+
import org.neo4j.driver.v1.Values;
25+
2126
public class RoutingSettings
2227
{
2328
final int maxRoutingFailures;
2429
final long retryTimeoutDelay;
30+
final Value routingParameters;
2531

26-
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay )
32+
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, Map<String, String> routingParameters )
2733
{
2834
this.maxRoutingFailures = maxRoutingFailures;
2935
this.retryTimeoutDelay = retryTimeoutDelay;
36+
this.routingParameters = Values.value( routingParameters );
3037
}
3138
}

driver/src/test/java/org/neo4j/driver/v1/util/ServerVersion.java renamed to driver/src/main/java/org/neo4j/driver/internal/cluster/ServerVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.v1.util;
19+
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.regex.Matcher;
2222
import java.util.regex.Pattern;
@@ -43,6 +43,7 @@ private ServerVersion( int major, int minor, int patch )
4343
}
4444
public static final ServerVersion v3_1_0 = new ServerVersion(3, 1, 0);
4545
public static final ServerVersion v3_0_0 = new ServerVersion(3, 0, 0);
46+
public static final ServerVersion v3_2_0 = new ServerVersion(3, 2, 0);
4647

4748
public static ServerVersion version( Driver driver )
4849
{

driver/src/main/java/org/neo4j/driver/internal/summary/InternalServerInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class InternalServerInfo implements ServerInfo
2626
private final BoltServerAddress address;
2727
private final String version;
2828

29-
public InternalServerInfo(BoltServerAddress address, String version)
29+
public InternalServerInfo( BoltServerAddress address, String version )
3030
{
3131
this.address = address;
3232
this.version = version;

driver/src/main/java/org/neo4j/driver/v1/Config.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.v1;
2020

2121
import java.io.File;
22+
import java.util.Map;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.logging.Level;
2425

@@ -74,6 +75,8 @@ public class Config
7475
private final int connectionTimeoutMillis;
7576
private final RetrySettings retrySettings;
7677

78+
private final Map<String, String> routingParameters;
79+
7780
private Config( ConfigBuilder builder)
7881
{
7982
this.logging = builder.logging;
@@ -88,6 +91,8 @@ private Config( ConfigBuilder builder)
8891
this.routingRetryDelayMillis = builder.routingRetryDelayMillis;
8992
this.connectionTimeoutMillis = builder.connectionTimeoutMillis;
9093
this.retrySettings = builder.retrySettings;
94+
95+
this.routingParameters = builder.routingParameters;
9196
}
9297

9398
/**
@@ -182,7 +187,7 @@ public static Config defaultConfig()
182187

183188
RoutingSettings routingSettings()
184189
{
185-
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis );
190+
return new RoutingSettings( routingFailureLimit, routingRetryDelayMillis, routingParameters );
186191
}
187192

188193
RetrySettings retrySettings()
@@ -205,6 +210,7 @@ public static class ConfigBuilder
205210
private long routingRetryDelayMillis = TimeUnit.SECONDS.toMillis( 5 );
206211
private int connectionTimeoutMillis = (int) TimeUnit.SECONDS.toMillis( 5 );
207212
private RetrySettings retrySettings = RetrySettings.DEFAULT;
213+
private Map<String,String> routingParameters = null;
208214

209215
private ConfigBuilder() {}
210216

@@ -473,6 +479,21 @@ public ConfigBuilder withMaxTransactionRetryTime( long value, TimeUnit unit )
473479
return this;
474480
}
475481

482+
/**
483+
* Specify routing parameters that would be passed to server in getServers Procedure call for customized
484+
* routing table reply.
485+
* This parameter is only valid for the routing driver, a.k.a. the driver created use bolt+routing in URI
486+
* scheme with 3.2+ Neo4j Casual Cluster servers.
487+
* @param parameters The parameters to pass to getServers Procedure
488+
* @since 1.3
489+
* @return this builder
490+
*/
491+
public ConfigBuilder withRoutingParameters( Map<String, String> parameters )
492+
{
493+
this.routingParameters = parameters;
494+
return this;
495+
}
496+
476497
/**
477498
* Create a config instance from this builder.
478499
* @return a {@link Config} instance

driver/src/test/java/org/neo4j/driver/internal/DriverFactoryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private Driver createDriver( DriverFactory driverFactory )
139139
private Driver createDriver( DriverFactory driverFactory, Config config )
140140
{
141141
AuthToken auth = AuthTokens.none();
142-
RoutingSettings routingSettings = new RoutingSettings( 42, 42 );
142+
RoutingSettings routingSettings = new RoutingSettings( 42, 42, null );
143143
return driverFactory.newInstance( uri, auth, routingSettings, RetrySettings.DEFAULT, config );
144144
}
145145

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -873,7 +873,7 @@ private static Driver newDriverWithFixedRetries( String uriString, int retries )
873873
private static Driver newDriver( String uriString, DriverFactory driverFactory )
874874
{
875875
URI uri = URI.create( uriString );
876-
RoutingSettings routingConf = new RoutingSettings( 1, 1 );
876+
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
877877
AuthToken auth = AuthTokens.none();
878878
return driverFactory.newInstance( uri, auth, routingConf, RetrySettings.DEFAULT, config );
879879
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
344344

345345
private Driver driverWithPool( ConnectionPool pool )
346346
{
347-
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
347+
RoutingSettings settings = new RoutingSettings( 10, 5_000, null );
348348
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
349349
Config config = Config.build().withLogging( logging ).toConfig();
350350
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionProviderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public void shouldPropagateConnectionFailureExceptions() throws Exception
216216
// Given
217217
GetServersProcedureRunner mockedRunner = mock( GetServersProcedureRunner.class );
218218
ClusterCompositionProvider provider = new GetServersProcedureClusterCompositionProvider( mock( Clock.class ),
219-
DEV_NULL_LOGGER );
219+
DEV_NULL_LOGGER, mockedRunner );
220220

221221
PooledConnection mockedConn = mock( PooledConnection.class );
222222
Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import org.junit.Test;
22+
23+
import java.util.HashMap;
24+
import java.util.List;
25+
26+
import org.neo4j.driver.internal.net.BoltServerAddress;
27+
import org.neo4j.driver.internal.spi.Connection;
28+
import org.neo4j.driver.internal.summary.InternalServerInfo;
29+
import org.neo4j.driver.v1.Record;
30+
import org.neo4j.driver.v1.Statement;
31+
import org.neo4j.driver.v1.Value;
32+
33+
import static org.hamcrest.MatcherAssert.assertThat;
34+
import static org.hamcrest.core.IsEqual.equalTo;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.when;
37+
import static org.neo4j.driver.v1.Values.value;
38+
39+
public class GetServersProcedureRunnerTest
40+
{
41+
@Test
42+
public void shouldCallGetServersV2WithNull() throws Throwable
43+
{
44+
// Given
45+
GetServersProcedureRunner runner = new TestGetServersProcedureRunner( value( (Object)null ) );
46+
Connection mock = mock( Connection.class );
47+
when( mock.server() ).thenReturn(
48+
new InternalServerInfo( new BoltServerAddress( "123:45" ), "Neo4j/3.2.1" ) );
49+
// When
50+
runner.run( mock );
51+
52+
// Then
53+
assertThat( runner.procedureCalled().toString(), equalTo(
54+
"Statement{text='CALL dbms.cluster.routing.getServersV2', parameters=NULL}" ) );
55+
}
56+
57+
@Test
58+
public void shouldCallGetServersV2WithParam() throws Throwable
59+
{
60+
// Given
61+
HashMap<String,String> param = new HashMap<>();
62+
param.put( "key1", "value1" );
63+
param.put( "key2", "value2" );
64+
GetServersProcedureRunner runner = new TestGetServersProcedureRunner( value( param ) );
65+
Connection mock = mock( Connection.class );
66+
when( mock.server() ).thenReturn(
67+
new InternalServerInfo( new BoltServerAddress( "123:45" ), "Neo4j/3.2.1" ) );
68+
// When
69+
runner.run( mock );
70+
71+
// Then
72+
assertThat( runner.procedureCalled().toString(), equalTo(
73+
"Statement{text='CALL dbms.cluster.routing.getServersV2', " +
74+
"parameters={key2: \"value2\", key1: \"value1\"}}" ) );
75+
}
76+
77+
@Test
78+
public void shouldCallGetServerV1() throws Throwable
79+
{
80+
// Given
81+
HashMap<String,String> param = new HashMap<>();
82+
param.put( "key1", "value1" );
83+
param.put( "key2", "value2" );
84+
GetServersProcedureRunner runner = new TestGetServersProcedureRunner( value( param ) );
85+
Connection mock = mock( Connection.class );
86+
when( mock.server() ).thenReturn(
87+
new InternalServerInfo( new BoltServerAddress( "123:45" ), "Neo4j/3.1.8" ) );
88+
// When
89+
runner.run( mock );
90+
91+
// Then
92+
assertThat( runner.procedureCalled().toString(), equalTo(
93+
"Statement{text='CALL dbms.cluster.routing.getServers', parameters={}}" ) );
94+
}
95+
96+
private static class TestGetServersProcedureRunner extends GetServersProcedureRunner
97+
{
98+
99+
TestGetServersProcedureRunner( Value parameters )
100+
{
101+
super( parameters );
102+
}
103+
104+
@Override
105+
List<Record> runProcedure( Connection connection, Statement procedure )
106+
{
107+
// I do not want any network traffic
108+
return null;
109+
}
110+
}
111+
112+
}

0 commit comments

Comments
 (0)