Skip to content

Commit 049d45b

Browse files
author
Zhen Li
authored
Merge pull request #346 from lutovich/1.3-routing-ctx-uri
Routing context in URI
2 parents 30cecca + 559db5a commit 049d45b

19 files changed

+468
-108
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.security.GeneralSecurityException;
2424

2525
import org.neo4j.driver.internal.cluster.LoadBalancer;
26+
import org.neo4j.driver.internal.cluster.RoutingContext;
2627
import org.neo4j.driver.internal.cluster.RoutingSettings;
2728
import org.neo4j.driver.internal.net.BoltServerAddress;
2829
import org.neo4j.driver.internal.net.SocketConnector;
@@ -54,14 +55,14 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
5455
RetrySettings retrySettings, Config config )
5556
{
5657
BoltServerAddress address = BoltServerAddress.from( uri );
58+
RoutingSettings newRoutingSettings = routingSettings.withRoutingContext( new RoutingContext( uri ) );
5759
SecurityPlan securityPlan = createSecurityPlan( address, config );
5860
ConnectionPool connectionPool = createConnectionPool( authToken, securityPlan, config );
5961
RetryLogic retryLogic = createRetryLogic( retrySettings, config.logging() );
6062

6163
try
6264
{
63-
return createDriver( address, uri.getScheme(), connectionPool, config, routingSettings, securityPlan,
64-
retryLogic );
65+
return createDriver( uri, address, connectionPool, config, newRoutingSettings, securityPlan, retryLogic );
6566
}
6667
catch ( Throwable driverError )
6768
{
@@ -78,12 +79,15 @@ public final Driver newInstance( URI uri, AuthToken authToken, RoutingSettings r
7879
}
7980
}
8081

81-
private Driver createDriver( BoltServerAddress address, String scheme, ConnectionPool connectionPool,
82-
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, RetryLogic retryLogic )
82+
private Driver createDriver( URI uri, BoltServerAddress address, ConnectionPool connectionPool,
83+
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan,
84+
RetryLogic retryLogic )
8385
{
84-
switch ( scheme.toLowerCase() )
86+
String scheme = uri.getScheme().toLowerCase();
87+
switch ( scheme )
8588
{
8689
case "bolt":
90+
assertNoRoutingContext( uri, routingSettings );
8791
return createDirectDriver( address, connectionPool, config, securityPlan, retryLogic );
8892
case "bolt+routing":
8993
return createRoutingDriver( address, connectionPool, config, routingSettings, securityPlan, retryLogic );
@@ -260,4 +264,14 @@ private static SecurityPlan createSecurityPlanImpl( BoltServerAddress address, C
260264
return insecure();
261265
}
262266
}
267+
268+
private static void assertNoRoutingContext( URI uri, RoutingSettings routingSettings )
269+
{
270+
RoutingContext routingContext = routingSettings.routingContext();
271+
if ( routingContext.isDefined() )
272+
{
273+
throw new IllegalArgumentException(
274+
"Routing parameters are not supported with scheme 'bolt'. Given URI: '" + uri + "'" );
275+
}
276+
}
263277
}

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R
157157
Clock clock, Logger log )
158158
{
159159
ClusterCompositionProvider clusterComposition =
160-
new GetServersProcedureClusterCompositionProvider( clock, log, settings );
160+
new RoutingProcedureClusterCompositionProvider( clock, log, settings );
161161
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) );
162162
}
163163
}

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
5959
{
6060
int failures = 0;
6161

62-
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
62+
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay(), delay * 2 ) )
6363
{
6464
long waitTime = start + delay - clock.millis();
6565
sleep( waitTime );
@@ -71,7 +71,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
7171
return composition;
7272
}
7373

74-
if ( ++failures >= settings.maxRoutingFailures )
74+
if ( ++failures >= settings.maxRoutingFailures() )
7575
{
7676
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
7777
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.net.URI;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import static java.util.Collections.emptyMap;
26+
import static java.util.Collections.unmodifiableMap;
27+
28+
public class RoutingContext
29+
{
30+
public static final RoutingContext EMPTY = new RoutingContext();
31+
32+
private final Map<String,String> context;
33+
34+
private RoutingContext()
35+
{
36+
this.context = emptyMap();
37+
}
38+
39+
public RoutingContext( URI uri )
40+
{
41+
this.context = unmodifiableMap( parseParameters( uri ) );
42+
}
43+
44+
public boolean isDefined()
45+
{
46+
return !context.isEmpty();
47+
}
48+
49+
public Map<String,String> asMap()
50+
{
51+
return context;
52+
}
53+
54+
private static Map<String,String> parseParameters( URI uri )
55+
{
56+
String query = uri.getQuery();
57+
58+
if ( query == null || query.isEmpty() )
59+
{
60+
return emptyMap();
61+
}
62+
63+
Map<String,String> parameters = new HashMap<>();
64+
String[] pairs = query.split( "&" );
65+
for ( String pair : pairs )
66+
{
67+
String[] keyValue = pair.split( "=" );
68+
if ( keyValue.length != 2 )
69+
{
70+
throw new IllegalArgumentException(
71+
"Invalid parameters: '" + pair + "' in URI '" + uri + "'" );
72+
}
73+
74+
String key = keyValue[0];
75+
String value = keyValue[1];
76+
String previousValue = parameters.put( key, value );
77+
78+
if ( previousValue != null )
79+
{
80+
throw new IllegalArgumentException(
81+
"Duplicated query parameters with key '" + key + "' in URI '" + uri + "'" );
82+
}
83+
}
84+
return parameters;
85+
}
86+
}
Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,28 +24,28 @@
2424
import org.neo4j.driver.internal.util.Clock;
2525
import org.neo4j.driver.v1.Logger;
2626
import org.neo4j.driver.v1.Record;
27+
import org.neo4j.driver.v1.Statement;
2728
import org.neo4j.driver.v1.exceptions.ClientException;
2829
import org.neo4j.driver.v1.exceptions.ProtocolException;
2930
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3031
import org.neo4j.driver.v1.exceptions.value.ValueException;
3132

3233
import static java.lang.String.format;
3334

34-
public class GetServersProcedureClusterCompositionProvider implements ClusterCompositionProvider
35+
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
3536
{
36-
37-
private final String PROTOCOL_ERROR_MESSAGE = "Failed to parse `%s' result received from server due to ";
37+
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";
3838

3939
private final Clock clock;
4040
private final Logger log;
41-
private final GetServersProcedureRunner getServersRunner;
41+
private final RoutingProcedureRunner getServersRunner;
4242

43-
public GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
43+
public RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingSettings settings )
4444
{
45-
this( clock, log, new GetServersProcedureRunner( settings.routingParameters ) );
45+
this( clock, log, new RoutingProcedureRunner( settings.routingContext() ) );
4646
}
4747

48-
GetServersProcedureClusterCompositionProvider( Clock clock, Logger log, GetServersProcedureRunner getServersRunner )
48+
RoutingProcedureClusterCompositionProvider( Clock clock, Logger log, RoutingProcedureRunner getServersRunner )
4949
{
5050
this.clock = clock;
5151
this.log = log;
@@ -67,7 +67,7 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
6767
return new ClusterCompositionResponse.Failure( new ServiceUnavailableException( format(
6868
"Failed to run '%s' on server. " +
6969
"Please make sure that there is a Neo4j 3.1+ causal cluster up running.",
70-
getServersRunner.procedureCalled() ), e
70+
invokedProcedureString() ), e
7171
) );
7272
}
7373

@@ -78,9 +78,8 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
7878
if ( records.size() != 1 )
7979
{
8080
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
81-
PROTOCOL_ERROR_MESSAGE +
82-
"records received '%s' is too few or too many.", getServersRunner.procedureCalled(),
83-
records.size() ) ) );
81+
PROTOCOL_ERROR_MESSAGE + "records received '%s' is too few or too many.",
82+
invokedProcedureString(), records.size() ) ) );
8483
}
8584

8685
// failed to parse the record
@@ -92,19 +91,25 @@ public ClusterCompositionResponse getClusterComposition( Connection connection )
9291
catch ( ValueException e )
9392
{
9493
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
95-
PROTOCOL_ERROR_MESSAGE +
96-
"unparsable record received.", getServersRunner.procedureCalled() ), e ) );
94+
PROTOCOL_ERROR_MESSAGE + "unparsable record received.",
95+
invokedProcedureString() ), e ) );
9796
}
9897

9998
// the cluster result is not a legal reply
10099
if ( !cluster.hasRoutersAndReaders() )
101100
{
102101
return new ClusterCompositionResponse.Failure( new ProtocolException( format(
103-
PROTOCOL_ERROR_MESSAGE +
104-
"no router or reader found in response.", getServersRunner.procedureCalled() ) ) );
102+
PROTOCOL_ERROR_MESSAGE + "no router or reader found in response.",
103+
invokedProcedureString() ) ) );
105104
}
106105

107106
// all good
108107
return new ClusterCompositionResponse.Success( cluster );
109108
}
109+
110+
private String invokedProcedureString()
111+
{
112+
Statement statement = getServersRunner.invokedProcedure();
113+
return statement.text() + " " + statement.parameters();
114+
}
110115
}

driver/src/main/java/org/neo4j/driver/internal/cluster/GetServersProcedureRunner.java renamed to driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.neo4j.driver.internal.cluster;
2121

2222
import java.util.List;
23-
import java.util.Map;
2423

2524
import org.neo4j.driver.internal.NetworkSession;
2625
import org.neo4j.driver.internal.spi.Connection;
@@ -32,42 +31,42 @@
3231
import static org.neo4j.driver.internal.util.ServerVersion.version;
3332
import static org.neo4j.driver.v1.Values.parameters;
3433

35-
public class GetServersProcedureRunner
34+
public class RoutingProcedureRunner
3635
{
3736
static final String GET_SERVERS = "dbms.cluster.routing.getServers";
3837
static final String GET_ROUTING_TABLE_PARAM = "context";
3938
static final String GET_ROUTING_TABLE = "dbms.cluster.routing.getRoutingTable({" + GET_ROUTING_TABLE_PARAM + "})";
4039

41-
private final Map<String, String> routingContext;
42-
private Statement procedureCalled;
40+
private final RoutingContext context;
41+
private Statement invokedProcedure;
4342

44-
public GetServersProcedureRunner( Map<String, String> context )
43+
public RoutingProcedureRunner( RoutingContext context )
4544
{
46-
this.routingContext = context;
45+
this.context = context;
4746
}
4847

4948
public List<Record> run( Connection connection )
5049
{
5150
if( version( connection.server().version() ).greaterThanOrEqual( v3_2_0 ) )
5251
{
53-
procedureCalled = new Statement( "CALL " + GET_ROUTING_TABLE,
54-
parameters(GET_ROUTING_TABLE_PARAM, routingContext ) );
52+
invokedProcedure = new Statement( "CALL " + GET_ROUTING_TABLE,
53+
parameters( GET_ROUTING_TABLE_PARAM, context.asMap() ) );
5554
}
5655
else
5756
{
58-
procedureCalled = new Statement("CALL " + GET_SERVERS );
57+
invokedProcedure = new Statement( "CALL " + GET_SERVERS );
5958
}
6059

61-
return runProcedure( connection, procedureCalled );
60+
return runProcedure( connection, invokedProcedure );
6261
}
6362

6463
List<Record> runProcedure( Connection connection, Statement procedure )
6564
{
6665
return NetworkSession.run( connection, procedure, NO_OP ).list();
6766
}
6867

69-
Statement procedureCalled()
68+
Statement invokedProcedure()
7069
{
71-
return procedureCalled;
70+
return invokedProcedure;
7271
}
7372
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingSettings.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,41 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.Map;
22-
2321
public class RoutingSettings
2422
{
25-
final int maxRoutingFailures;
26-
final long retryTimeoutDelay;
27-
final Map<String, String> routingParameters;
23+
private final int maxRoutingFailures;
24+
private final long retryTimeoutDelay;
25+
private final RoutingContext routingContext;
26+
27+
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay )
28+
{
29+
this( maxRoutingFailures, retryTimeoutDelay, RoutingContext.EMPTY );
30+
}
2831

29-
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, Map<String, String> routingParameters )
32+
public RoutingSettings( int maxRoutingFailures, long retryTimeoutDelay, RoutingContext routingContext )
3033
{
3134
this.maxRoutingFailures = maxRoutingFailures;
3235
this.retryTimeoutDelay = retryTimeoutDelay;
33-
this.routingParameters = routingParameters;
36+
this.routingContext = routingContext;
37+
}
38+
39+
public RoutingSettings withRoutingContext( RoutingContext newRoutingContext )
40+
{
41+
return new RoutingSettings( maxRoutingFailures, retryTimeoutDelay, newRoutingContext );
42+
}
43+
44+
public int maxRoutingFailures()
45+
{
46+
return maxRoutingFailures;
47+
}
48+
49+
public long retryTimeoutDelay()
50+
{
51+
return retryTimeoutDelay;
52+
}
53+
54+
public RoutingContext routingContext()
55+
{
56+
return routingContext;
3457
}
3558
}

0 commit comments

Comments
 (0)