Skip to content

Adding address to routing context in routing procedure call #704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import java.util.HashMap;
import java.util.Map;

import org.neo4j.driver.internal.BoltServerAddress;

import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;

public class RoutingContext
{
public static final RoutingContext EMPTY = new RoutingContext();
private static final String ROUTING_ADDRESS_KEY = "address";

private final Map<String,String> context;

Expand All @@ -43,7 +46,7 @@ public RoutingContext( URI uri )

public boolean isDefined()
{
return !context.isEmpty();
return context.size() > 1;
}

public Map<String,String> asMap()
Expand All @@ -60,13 +63,25 @@ public String toString()
private static Map<String,String> parseParameters( URI uri )
{
String query = uri.getQuery();
String address;

if ( query == null || query.isEmpty() )
if ( uri.getPort() == -1 )
{
return emptyMap();
address = String.format( "%s:%s", uri.getHost(), BoltServerAddress.DEFAULT_PORT );
}
else
{
address = String.format( "%s:%s", uri.getHost(), uri.getPort() );
}

Map<String,String> parameters = new HashMap<>();
parameters.put( ROUTING_ADDRESS_KEY, address );

if ( query == null || query.isEmpty() )
{
return parameters;
}

String[] pairs = query.split( "&" );
for ( String pair : pairs )
{
Expand All @@ -77,19 +92,29 @@ private static Map<String,String> parseParameters( URI uri )
"Invalid parameters: '" + pair + "' in URI '" + uri + "'" );
}

String key = trimAndVerify( keyValue[0], "key", uri );
String value = trimAndVerify( keyValue[1], "value", uri );

String previousValue = parameters.put( key, value );
String previousValue = parameters.put( trimAndVerifyKey( keyValue[0], "key", uri ),
trimAndVerify( keyValue[1], "value", uri ) );
if ( previousValue != null )
{
throw new IllegalArgumentException(
"Duplicated query parameters with key '" + key + "' in URI '" + uri + "'" );
"Duplicated query parameters with key '" + previousValue + "' in URI '" + uri + "'" );
}
}
return parameters;
}

private static String trimAndVerifyKey( String s, String key, URI uri )
{
String trimmed = trimAndVerify( s, key, uri );

if (trimmed.equals( ROUTING_ADDRESS_KEY ))
{
throw new IllegalArgumentException( "The key 'address' is reserved for routing context.");
}

return trimmed;
}

private static String trimAndVerify( String string, String name, URI uri )
{
String result = string.trim();
Expand Down
4 changes: 2 additions & 2 deletions driver/src/test/java/org/neo4j/driver/GraphDatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ void throwsWhenBoltSchemeUsedWithRoutingParams()
@Test
void shouldLogWhenUnableToCreateRoutingDriver() throws Exception
{
StubServer server1 = StubServer.start( "discover_not_supported.script", 9001 );
StubServer server2 = StubServer.start( "discover_not_supported.script", 9002 );
StubServer server1 = StubServer.start( "discover_not_supported_9001.script", 9001 );
StubServer server2 = StubServer.start( "discover_not_supported_9002.script", 9002 );

Logging logging = mock( Logging.class );
Logger logger = mock( Logger.class );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void shouldRoundRobinWriteSessionsInTransaction() throws Exception
void shouldFailOnNonDiscoverableServer() throws IOException, InterruptedException
{
// Given
StubServer.start( "discover_not_supported.script", 9001 );
StubServer.start( "discover_not_supported_9001.script", 9001 );
URI uri = URI.create( "neo4j://127.0.0.1:9001" );
final Driver driver = GraphDatabase.driver( uri, INSECURE_CONFIG );

Expand Down Expand Up @@ -778,10 +778,10 @@ void shouldRetryWriteTransactionUntilFailure() throws Exception
@Test
void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws Exception
{
StubServer router1 = StubServer.start( "acquire_endpoints_v3.script", 9010 );
StubServer router1 = StubServer.start( "acquire_endpoints_v3_9010.script", 9010 );
StubServer brokenReader1 = StubServer.start( "dead_read_server_tx.script", 9005 );
StubServer brokenReader2 = StubServer.start( "dead_read_server_tx.script", 9006 );
StubServer router2 = StubServer.start( "discover_servers.script", 9003 );
StubServer router2 = StubServer.start( "discover_servers_9010.script", 9003 );
StubServer reader = StubServer.start( "read_server_v3_read_tx.script", 9004 );

try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9010" ); Session session = driver.session() )
Expand All @@ -805,9 +805,9 @@ void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws Except
@Test
void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception
{
StubServer router1 = StubServer.start( "discover_servers.script", 9010 );
StubServer router1 = StubServer.start( "discover_servers_9010.script", 9010 );
StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 );
StubServer router2 = StubServer.start( "acquire_endpoints_v3.script", 9002 );
StubServer router2 = StubServer.start( "acquire_endpoints_v3_9010.script", 9002 );
StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 );
StubServer writer = StubServer.start( "write_server_v3_write_tx.script", 9007 );

Expand All @@ -833,16 +833,16 @@ void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Excep
void shouldUseInitialRouterForRediscoveryWhenAllOtherRoutersAreDead() throws Exception
{
// initial router does not have itself in the returned set of routers
StubServer router = StubServer.start( "acquire_endpoints_v3.script", 9010 );
StubServer router = StubServer.start( "acquire_endpoints_v3.script", 9001 );

try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG ) )
try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG ) )
{
driver.verifyConnectivity();
try ( Session session = driver.session( builder().withDefaultAccessMode( AccessMode.READ ).build() ) )
{
// restart router on the same port with different script that contains itself as reader
assertEquals( 0, router.exitStatus() );
router = StubServer.start( "rediscover_using_initial_router.script", 9010 );
router = StubServer.start( "rediscover_using_initial_router.script", 9001 );

List<String> names = readStrings( "MATCH (n) RETURN n.name AS name", session );
assertEquals( asList( "Bob", "Alice" ), names );
Expand Down Expand Up @@ -895,8 +895,8 @@ void shouldSendRoutingContextToServer() throws Exception
@Test
void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception
{
StubServer router1 = StubServer.start( "discover_no_writers.script", 9010 );
StubServer router2 = StubServer.start( "discover_no_writers.script", 9004 );
StubServer router1 = StubServer.start( "discover_no_writers_9010.script", 9010 );
StubServer router2 = StubServer.start( "discover_no_writers_9010.script", 9004 );
StubServer reader = StubServer.start( "read_server_v3_read_tx.script", 9003 );

try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9010", INSECURE_CONFIG );
Expand All @@ -919,7 +919,7 @@ void shouldAcceptRoutingTableWithoutWritersAndThenRediscover() throws Exception
{
// first router does not have itself in the resulting routing table so connection
// towards it will be closed after rediscovery
StubServer router1 = StubServer.start( "discover_no_writers.script", 9010 );
StubServer router1 = StubServer.start( "discover_no_writers_9010.script", 9010 );
StubServer router2 = null;
StubServer reader = StubServer.start( "read_server_v3_read_tx.script", 9003 );
StubServer writer = StubServer.start( "write_with_bookmarks.script", 9007 );
Expand All @@ -930,7 +930,7 @@ void shouldAcceptRoutingTableWithoutWritersAndThenRediscover() throws Exception
try ( Session session = driver.session() )
{
// start another router which knows about writes, use same address as the initial router
router2 = StubServer.start( "acquire_endpoints_v3.script", 9010 );
router2 = StubServer.start( "acquire_endpoints_v3_9010.script", 9010 );

assertEquals( asList( "Bob", "Alice", "Tina" ), readStrings( "MATCH (n) RETURN n.name", session ) );

Expand Down Expand Up @@ -983,7 +983,7 @@ void shouldSendMultipleBookmarks() throws Exception
StubServer router = StubServer.start( "acquire_endpoints_v3.script", 9001 );
StubServer writer = StubServer.start( "multiple_bookmarks.script", 9007 );

try ( Driver driver = GraphDatabase.driver( "neo4j://localhost:9001", INSECURE_CONFIG );
try ( Driver driver = GraphDatabase.driver( "neo4j://127.0.0.1:9001", INSECURE_CONFIG );
Session session = driver.session( builder().withBookmarks( InternalBookmark.parse(
asOrderedSet( "neo4j:bookmark:v1:tx5", "neo4j:bookmark:v1:tx29", "neo4j:bookmark:v1:tx94", "neo4j:bookmark:v1:tx56",
"neo4j:bookmark:v1:tx16", "neo4j:bookmark:v1:tx68" ) ) ).build() ) )
Expand All @@ -1007,16 +1007,16 @@ void shouldSendMultipleBookmarks() throws Exception
void shouldForgetAddressOnDatabaseUnavailableError() throws Exception
{
// perform initial discovery using router1
StubServer router1 = StubServer.start( "discover_servers.script", 9010 );
StubServer router1 = StubServer.start( "discover_servers_9010.script", 9010 );
// attempt to write using writer1 which fails with 'Neo.TransientError.General.DatabaseUnavailable'
// it should then be forgotten and trigger new rediscovery
StubServer writer1 = StubServer.start( "writer_unavailable.script", 9001 );
// perform rediscovery using router2, it should return a valid writer2
StubServer router2 = StubServer.start( "acquire_endpoints_v3.script", 9002 );
StubServer router2 = StubServer.start( "acquire_endpoints_v3_9010.script", 9002 );
// write on writer2 should be successful
StubServer writer2 = StubServer.start( "write_server_v3_write_tx.script", 9007 );

try ( Driver driver = newDriverWithSleeplessClock( "neo4j://localhost:9010" ); Session session = driver.session() )
try ( Driver driver = newDriverWithSleeplessClock( "neo4j://127.0.0.1:9010" ); Session session = driver.session() )
{
AtomicInteger invocations = new AtomicInteger();
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void shouldRetryOnEmptyDiscoveryResult() throws IOException, InterruptedExceptio
};

StubServer emptyRouter = StubServer.start( "acquire_endpoints_v4_empty.script", 9001 );
StubServer realRouter = StubServer.start( "acquire_endpoints_v4.script", 9002 );
StubServer realRouter = StubServer.start( "acquire_endpoints_v4_virtual_host.script", 9002 );
StubServer reader = StubServer.start( "read_server_v4_read.script", 9005 );

Config config = insecureBuilder().withResolver( resolver ).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void uriWithQueryIsParsed()
expectedMap.put( "key1", "value1" );
expectedMap.put( "key2", "value2" );
expectedMap.put( "key3", "value3" );
expectedMap.put( "address", "localhost:7687" );
assertEquals( expectedMap, context.asMap() );
}

Expand Down Expand Up @@ -101,10 +102,32 @@ void mapRepresentationIsUnmodifiable()
URI uri = URI.create( "neo4j://localhost:7687/?key1=value1" );
RoutingContext context = new RoutingContext( uri );

assertEquals( singletonMap( "key1", "value1" ), context.asMap() );
Map<String,String> expectedMap = new HashMap<>();
expectedMap.put( "key1", "value1" );
expectedMap.put( "address", "localhost:7687" );

assertEquals( expectedMap, context.asMap() );

assertThrows( UnsupportedOperationException.class, () -> context.asMap().put( "key2", "value2" ) );
assertEquals( singletonMap( "key1", "value1" ), context.asMap() );
assertEquals( expectedMap, context.asMap() );
}

@Test
void populateAddressWithDefaultPort()
{
URI uri = URI.create( "neo4j://localhost/" );
RoutingContext context = new RoutingContext( uri );

assertEquals( singletonMap( "address", "localhost:7687" ), context.asMap() );
}

@Test
void throwsExceptionIfAddressIsUsedInContext()
{
URI uri = URI.create( "neo4j://localhost:7687/?key1=value1&address=someaddress:9010" );

IllegalArgumentException e = assertThrows( IllegalArgumentException.class, () -> new RoutingContext( uri ) );
assertEquals( "The key 'address' is reserved for routing context.", e.getMessage() );
}

private static void testIllegalUri( URI uri )
Expand All @@ -116,7 +139,10 @@ private static void testEmptyRoutingContext( URI uri )
{
RoutingContext context = new RoutingContext( uri );

Map<String,String> expectedMap = new HashMap<>();
expectedMap.put( "address", "localhost:7687" );

assertFalse( context.isDefined() );
assertTrue( context.asMap().isEmpty() );
assertEquals( singletonMap( "address", "localhost:7687" ), context.asMap() );
}
}
4 changes: 2 additions & 2 deletions driver/src/test/resources/acquire_endpoints_v3.script
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9001"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9004","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
SUCCESS {}
10 changes: 10 additions & 0 deletions driver/src/test/resources/acquire_endpoints_v3_9010.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
!: BOLT 3
!: AUTO RESET
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9010"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
SUCCESS {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "my.virtual.host:8080"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, []]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9001"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9001"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9001"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
SUCCESS {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "127.0.0.1:9001"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "my.virtual.host:8080"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9010"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9011"], "role": "READ"},{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"}]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {}} {}
C: RUN "CALL dbms.cluster.routing.getRoutingTable($context)" {"context": {"address": "my.virtual.host:8080"}} {}
PULL_ALL
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
Expand Down
2 changes: 1 addition & 1 deletion driver/src/test/resources/acquire_endpoints_v4.script
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {}, "database": "mydatabase"} {"mode": "r", "db": "system"}
C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "127.0.0.1:9001"}, "database": "mydatabase"} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {}, "database": "mydatabase"} {"mode": "r", "db": "system"}
C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {"address": "127.0.0.1:9001" }, "database": "mydatabase"} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: FAILURE {"code": "Neo.ClientError.Database.DatabaseNotFound", "message": "wut!"}
IGNORED
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
!: AUTO HELLO
!: AUTO GOODBYE

C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": {}, "database": "mydatabase"} {"mode": "r", "db": "system"}
C: RUN "CALL dbms.routing.getRoutingTable($context, $database)" {"context": { "address": "my.virtual.host:8080" }, "database": "mydatabase"} {"mode": "r", "db": "system"}
PULL {"n": -1}
S: SUCCESS {"fields": ["ttl", "servers"]}
RECORD [9223372036854775807, []]
Expand Down
Loading