diff --git a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java index ec1de15d7f..51140a11d0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java @@ -18,8 +18,6 @@ */ package org.neo4j.driver.internal; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -378,28 +376,4 @@ private void closeCurrentConnection( boolean sync ) logger.debug( "Released connection " + connection.hashCode() ); } } - - private static List recordError( Throwable error, List errors ) - { - if ( errors == null ) - { - errors = new ArrayList<>(); - } - errors.add( error ); - return errors; - } - - private static void addSuppressed( Throwable error, List suppressedErrors ) - { - if ( suppressedErrors != null ) - { - for ( Throwable suppressedError : suppressedErrors ) - { - if ( error != suppressedError ) - { - error.addSuppressed( suppressedError ); - } - } - } - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index a4babae078..2bfafd4805 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -25,6 +25,7 @@ import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.v1.AccessMode; import static java.lang.String.format; import static java.util.Arrays.asList; @@ -34,7 +35,7 @@ public class ClusterRoutingTable implements RoutingTable private static final int MIN_ROUTERS = 1; private final Clock clock; - private long expirationTimeout; + private volatile long expirationTimeout; private final RoundRobinAddressSet readers; private final RoundRobinAddressSet writers; private final RoundRobinAddressSet routers; @@ -56,12 +57,12 @@ private ClusterRoutingTable( Clock clock ) } @Override - public boolean isStale() + public boolean isStaleFor( AccessMode mode ) { - return expirationTimeout < clock.millis() || // the expiration timeout has been reached - routers.size() <= MIN_ROUTERS || // we need to discover more routing servers - readers.size() == 0 || // we need to discover more read servers - writers.size() == 0; // we need to discover more write servers + return expirationTimeout < clock.millis() || + routers.size() <= MIN_ROUTERS || + mode == AccessMode.READ && readers.size() == 0 || + mode == AccessMode.WRITE && writers.size() == 0; } @Override @@ -115,7 +116,7 @@ public void removeWriter( BoltServerAddress toRemove ) @Override - public String toString() + public synchronized String toString() { return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s", expirationTimeout, clock.millis(), routers, writers, readers ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index e88b84b9ec..ef48041d29 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -29,8 +29,8 @@ import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; -import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; +import org.neo4j.driver.v1.exceptions.SessionExpiredException; public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable { @@ -61,15 +61,14 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, this.rediscovery = rediscovery; this.log = log; - // initialize the routing table - ensureRouting(); + refreshRoutingTable(); } @Override public PooledConnection acquireConnection( AccessMode mode ) { RoundRobinAddressSet addressSet = addressSetFor( mode ); - PooledConnection connection = acquireConnection( addressSet ); + PooledConnection connection = acquireConnection( mode, addressSet ); return new RoutingPooledConnection( connection, this, mode ); } @@ -91,26 +90,23 @@ public void close() throws Exception connections.close(); } - private PooledConnection acquireConnection( RoundRobinAddressSet servers ) throws ServiceUnavailableException + private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers ) { - for ( ; ; ) + ensureRouting( mode ); + for ( BoltServerAddress address; (address = servers.next()) != null; ) { - // refresh the routing table if needed - ensureRouting(); - for ( BoltServerAddress address; (address = servers.next()) != null; ) + try { - try - { - return connections.acquire( address ); - } - catch ( ServiceUnavailableException e ) - { - log.error( "Failed to obtain a connection towards address " + address, e ); - forget( address ); - } + return connections.acquire( address ); + } + catch ( ServiceUnavailableException e ) + { + log.error( "Failed to obtain a connection towards address " + address, e ); + forget( address ); } - // if we get here, we failed to connect to any server, so we will rebuild the routing table } + throw new SessionExpiredException( + "Failed to obtain connection towards " + mode + " server. Known routing table is: " + routingTable ); } private synchronized void forget( BoltServerAddress address ) @@ -121,23 +117,28 @@ private synchronized void forget( BoltServerAddress address ) connections.purge( address ); } - synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolException + synchronized void ensureRouting( AccessMode mode ) { - if ( routingTable.isStale() ) + if ( routingTable.isStaleFor( mode ) ) { - log.info( "Routing information is stale. %s", routingTable ); + refreshRoutingTable(); + } + } - // get a new routing table - ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable ); - Set removed = routingTable.update( cluster ); - // purge connections to removed addresses - for ( BoltServerAddress address : removed ) - { - connections.purge( address ); - } + synchronized void refreshRoutingTable() + { + log.info( "Routing information is stale. %s", routingTable ); - log.info( "Refreshed routing information. %s", routingTable ); + // get a new routing table + ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections ); + Set removed = routingTable.update( cluster ); + // purge connections to removed addresses + for ( BoltServerAddress address : removed ) + { + connections.purge( address ); } + + log.info( "Refreshed routing information. %s", routingTable ); } private RoundRobinAddressSet addressSetFor( AccessMode mode ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index 2958a9b0c9..a038da4dfc 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -18,6 +18,7 @@ */ package org.neo4j.driver.internal.cluster; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -42,6 +43,8 @@ public class Rediscovery private final ClusterCompositionProvider provider; private final HostNameResolver hostNameResolver; + private boolean useInitialRouter; + public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger, ClusterCompositionProvider provider, HostNameResolver hostNameResolver ) { @@ -53,9 +56,15 @@ public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, C this.hostNameResolver = hostNameResolver; } - // Given the current routing table and connection pool, use the connection composition provider to fetch a new - // cluster composition, which would be used to update the routing table and connection pool - public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable ) + /** + * Given the current routing table and connection pool, use the connection composition provider to fetch a new + * cluster composition, which would be used to update the routing table and connection pool. + * + * @param routingTable current routing table. + * @param connections connection pool. + * @return new cluster composition. + */ + public ClusterComposition lookupClusterComposition( RoutingTable routingTable, ConnectionPool connections ) { int failures = 0; @@ -65,7 +74,7 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections, sleep( waitTime ); start = clock.millis(); - ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable ); + ClusterComposition composition = lookup( routingTable, connections ); if ( composition != null ) { return composition; @@ -78,11 +87,56 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections, } } - private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections, - RoutingTable routingTable ) + private ClusterComposition lookup( RoutingTable routingTable, ConnectionPool connections ) + { + ClusterComposition composition; + + if ( useInitialRouter ) + { + composition = lookupOnInitialRouterThenOnKnownRouters( routingTable, connections ); + useInitialRouter = false; + } + else + { + composition = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connections ); + } + + if ( composition != null && !composition.hasWriters() ) + { + useInitialRouter = true; + } + + return composition; + } + + private ClusterComposition lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable, + ConnectionPool connections ) + { + Set seenServers = new HashSet<>(); + ClusterComposition composition = lookupOnKnownRouters( routingTable, connections, seenServers ); + if ( composition == null ) + { + return lookupOnInitialRouter( routingTable, connections, seenServers ); + } + return composition; + } + + private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable routingTable, + ConnectionPool connections ) + { + Set seenServers = Collections.emptySet(); + ClusterComposition composition = lookupOnInitialRouter( routingTable, connections, seenServers ); + if ( composition == null ) + { + return lookupOnKnownRouters( routingTable, connections, new HashSet() ); + } + return composition; + } + + private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections, + Set seenServers ) { int size = routingTable.routerSize(); - Set triedServers = new HashSet<>(); for ( int i = 0; i < size; i++ ) { BoltServerAddress address = routingTable.nextRouter(); @@ -91,22 +145,28 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo break; } - ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable ); + ClusterComposition composition = lookupOnRouter( address, routingTable, connections ); if ( composition != null ) { return composition; } else { - triedServers.add( address ); + seenServers.add( address ); } } + return null; + } + + private ClusterComposition lookupOnInitialRouter( RoutingTable routingTable, + ConnectionPool connections, Set triedServers ) + { Set ips = hostNameResolver.resolve( initialRouter ); ips.removeAll( triedServers ); for ( BoltServerAddress address : ips ) { - ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable ); + ClusterComposition composition = lookupOnRouter( address, routingTable, connections ); if ( composition != null ) { return composition; @@ -116,8 +176,8 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo return null; } - private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress, - ConnectionPool connections, RoutingTable routingTable ) + private ClusterComposition lookupOnRouter( BoltServerAddress routerAddress, RoutingTable routingTable, + ConnectionPool connections ) { ClusterCompositionResponse response; try ( Connection connection = connections.acquire( routerAddress ) ) @@ -139,11 +199,7 @@ private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress r ClusterComposition cluster = response.clusterComposition(); logger.info( "Got cluster composition %s", cluster ); - if ( cluster.hasWriters() ) - { - return cluster; - } - return null; + return cluster; } private void sleep( long millis ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 91a6d62808..9ec32dddb0 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -21,10 +21,11 @@ import java.util.Set; import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.v1.AccessMode; public interface RoutingTable { - boolean isStale(); + boolean isStaleFor( AccessMode mode ); Set update( ClusterComposition cluster ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 797a905771..463525ecaa 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -69,7 +69,7 @@ public SocketConnectionPool( PoolSettings poolSettings, Connector connector, Clo } @Override - public PooledConnection acquire( final BoltServerAddress address ) + public PooledConnection acquire( BoltServerAddress address ) { assertNotClosed(); BlockingPooledConnectionQueue connectionQueue = pool( address ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index 52f61380d6..5370643800 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.net.URI; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; @@ -43,6 +43,7 @@ import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -50,9 +51,12 @@ import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.StubServer; +import static java.util.Arrays.asList; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -87,7 +91,7 @@ public String apply( Record record ) } } ); - assertThat( result, equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } // Finally @@ -118,7 +122,7 @@ public String apply( Record record ) } } ); - assertThat( result, equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + assertThat( result, equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } // Finally @@ -150,7 +154,7 @@ public String apply( Record record ) { return record.get( "n.name" ).asString(); } - } ), equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + } ), equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } } } @@ -186,7 +190,7 @@ public String apply( Record record ) { return record.get( "n.name" ).asString(); } - } ), equalTo( Arrays.asList( "Bob", "Alice", "Tina" ) ) ); + } ), equalTo( asList( "Bob", "Alice", "Tina" ) ) ); } } } @@ -842,10 +846,8 @@ public void shouldUseInitialRouterForRediscoveryWhenAllOtherRoutersAreDead() thr assertEquals( 0, router.exitStatus() ); router = StubServer.start( "rediscover_using_initial_router.script", 9010 ); - List records = session.run( "MATCH (n) RETURN n.name AS name" ).list(); - assertEquals( 2, records.size() ); - assertEquals( "Bob", records.get( 0 ).get( "name" ).asString() ); - assertEquals( "Alice", records.get( 1 ).get( "name" ).asString() ); + List names = readStrings( "MATCH (n) RETURN n.name AS name", session ); + assertEquals( asList( "Bob", "Alice" ), names ); } } @@ -915,6 +917,82 @@ public void shouldIgnoreRoutingContextWhenServerDoesNotSupportIt() throws Except } } + @Test + public void shouldServeReadsButFailWritesWhenNoWritersAvailable() throws Exception + { + StubServer router1 = StubServer.start( "discover_no_writers.script", 9010 ); + StubServer router2 = StubServer.start( "discover_no_writers.script", 9004 ); + StubServer reader = StubServer.start( "read_server.script", 9003 ); + + try ( Driver driver = GraphDatabase.driver( "bolt+routing://127.0.0.1:9010", config ); + Session session = driver.session() ) + { + assertEquals( asList( "Bob", "Alice", "Tina" ), readStrings( "MATCH (n) RETURN n.name", session ) ); + + try + { + session.run( "CREATE (n {name:'Bob'})" ).consume(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + } + } + finally + { + assertEquals( 0, router1.exitStatus() ); + assertEquals( 0, router2.exitStatus() ); + assertEquals( 0, reader.exitStatus() ); + } + } + + @Test + public void shouldAcceptRoutingTableWithoutWritersAndThenRediscover() throws Exception + { + // first router does not have itself in the resulting routing table so connection + // towards it will be closed after rediscovery + StubServer router1 = StubServer.start( "discover_no_writers.script", 9010 ); + StubServer router2 = null; + StubServer reader = StubServer.start( "read_server.script", 9003 ); + StubServer writer = StubServer.start( "write_server.script", 9007 ); + + try ( Driver driver = GraphDatabase.driver( "bolt+routing://127.0.0.1:9010", config ); + Session session = driver.session() ) + { + // start another router which knows about writes, use same address as the initial router + router2 = StubServer.start( "acquire_endpoints.script", 9010 ); + + List names = session.readTransaction( new TransactionWork>() + { + @Override + public List execute( Transaction tx ) + { + List records = tx.run( "MATCH (n) RETURN n.name" ).list(); + List names = new ArrayList<>( records.size() ); + for ( Record record : records ) + { + names.add( record.get( 0 ).asString() ); + } + return names; + } + } ); + + assertEquals( asList( "Bob", "Alice", "Tina" ), names ); + + StatementResult createResult = session.run( "CREATE (n {name:'Bob'})" ); + assertFalse( createResult.hasNext() ); + } + finally + { + assertEquals( 0, router1.exitStatus() ); + assertNotNull( router2 ); + assertEquals( 0, router2.exitStatus() ); + assertEquals( 0, reader.exitStatus() ); + assertEquals( 0, writer.exitStatus() ); + } + } + private static Driver newDriverWithSleeplessClock( String uriString ) { DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() ); @@ -947,4 +1025,22 @@ public List execute( Transaction tx ) } }; } + + private static List readStrings( final String query, Session session ) + { + return session.readTransaction( new TransactionWork>() + { + @Override + public List execute( Transaction tx ) + { + List records = tx.run( query ).list(); + List names = new ArrayList<>( records.size() ); + for ( Record record : records ) + { + names.add( record.get( 0 ).asString() ); + } + return names; + } + } ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java index b8f754c9bb..51d41cc3fb 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterCompositionUtil.java @@ -20,7 +20,7 @@ package org.neo4j.driver.internal.cluster; import java.util.ArrayList; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -60,9 +60,9 @@ public static ClusterComposition createClusterComposition( List... servers ) { - Set routers = new HashSet<>(); - Set writers = new HashSet<>(); - Set readers = new HashSet<>(); + Set routers = new LinkedHashSet<>(); + Set writers = new LinkedHashSet<>(); + Set readers = new LinkedHashSet<>(); switch( servers.length ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 05e912a50f..fe4e338da6 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -20,6 +20,9 @@ import org.junit.Test; +import java.util.List; + +import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.util.FakeClock; import static java.util.Arrays.asList; @@ -34,6 +37,8 @@ import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; +import static org.neo4j.driver.v1.AccessMode.READ; +import static org.neo4j.driver.v1.AccessMode.WRITE; public class ClusterRoutingTableTest { @@ -50,7 +55,8 @@ public void shouldReturnStaleIfTtlExpired() throws Exception clock.progress( 1234 ); // Then - assertTrue( routingTable.isStale() ); + assertTrue( routingTable.isStaleFor( READ ) ); + assertTrue( routingTable.isStaleFor( WRITE ) ); } @Test @@ -64,11 +70,12 @@ public void shouldReturnStaleIfNoRouter() throws Exception routingTable.update( createClusterComposition( EMPTY, asList( C ), asList( D, E ) ) ); // Then - assertTrue( routingTable.isStale() ); + assertTrue( routingTable.isStaleFor( READ ) ); + assertTrue( routingTable.isStaleFor( WRITE ) ); } @Test - public void shouldReturnStaleIfNoReader() throws Exception + public void shouldBeStaleForReadsButNotWritesWhenNoReaders() throws Exception { // Given FakeClock clock = new FakeClock(); @@ -78,11 +85,12 @@ public void shouldReturnStaleIfNoReader() throws Exception routingTable.update( createClusterComposition( asList( A, B ), asList( C ), EMPTY ) ); // Then - assertTrue( routingTable.isStale() ); + assertTrue( routingTable.isStaleFor( READ ) ); + assertFalse( routingTable.isStaleFor( WRITE ) ); } @Test - public void shouldReturnStatleIfNoWriter() throws Exception + public void shouldBeStaleForWritesButNotReadsWhenNoWriters() throws Exception { // Given FakeClock clock = new FakeClock(); @@ -92,11 +100,12 @@ public void shouldReturnStatleIfNoWriter() throws Exception routingTable.update( createClusterComposition( asList( A, B ), EMPTY, asList( D, E ) ) ); // Then - assertTrue( routingTable.isStale() ); + assertFalse( routingTable.isStaleFor( READ ) ); + assertTrue( routingTable.isStaleFor( WRITE ) ); } @Test - public void shouldNotStale() throws Exception + public void shouldBeNotStaleWithReadersWritersAndRouters() throws Exception { // Given FakeClock clock = new FakeClock(); @@ -106,11 +115,12 @@ public void shouldNotStale() throws Exception routingTable.update( createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ) ); // Then - assertFalse( routingTable.isStale() ); + assertFalse( routingTable.isStaleFor( READ ) ); + assertFalse( routingTable.isStaleFor( WRITE ) ); } @Test - public void shouldStaleWhenCreate() throws Throwable + public void shouldBeStaleForReadsAndWritesAfterCreation() throws Throwable { // Given FakeClock clock = new FakeClock(); @@ -119,13 +129,17 @@ public void shouldStaleWhenCreate() throws Throwable RoutingTable routingTable = new ClusterRoutingTable( clock, A ); // Then - assertTrue( routingTable.isStale() ); + assertTrue( routingTable.isStaleFor( READ ) ); + assertTrue( routingTable.isStaleFor( WRITE ) ); } @Test - public void preservesOrderingOfRouters() + public void shouldPreserveOrderingOfRouters() { - ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock(), A, C, D, F, B, E ); + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + List routers = asList( A, C, D, F, B, E ); + + routingTable.update( createClusterComposition( routers, EMPTY, EMPTY ) ); assertEquals( A, routingTable.nextRouter() ); assertEquals( C, routingTable.nextRouter() ); @@ -133,5 +147,38 @@ public void preservesOrderingOfRouters() assertEquals( F, routingTable.nextRouter() ); assertEquals( B, routingTable.nextRouter() ); assertEquals( E, routingTable.nextRouter() ); + assertEquals( A, routingTable.nextRouter() ); + } + + @Test + public void shouldPreserveOrderingOfWriters() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + List writers = asList( D, F, A, C, E ); + + routingTable.update( createClusterComposition( EMPTY, writers, EMPTY ) ); + + assertEquals( D, routingTable.writers().next() ); + assertEquals( F, routingTable.writers().next() ); + assertEquals( A, routingTable.writers().next() ); + assertEquals( C, routingTable.writers().next() ); + assertEquals( E, routingTable.writers().next() ); + assertEquals( D, routingTable.writers().next() ); + } + + @Test + public void shouldPreserveOrderingOfReaders() + { + ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() ); + List readers = asList( B, A, F, C, D ); + + routingTable.update( createClusterComposition( EMPTY, EMPTY, readers ) ); + + assertEquals( B, routingTable.readers().next() ); + assertEquals( A, routingTable.readers().next() ); + assertEquals( F, routingTable.readers().next() ); + assertEquals( C, routingTable.readers().next() ); + assertEquals( D, routingTable.readers().next() ); + assertEquals( B, routingTable.readers().next() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 6d62a2e175..873781b9a9 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -22,6 +22,7 @@ import org.mockito.InOrder; import java.util.Collections; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -43,10 +44,13 @@ import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; -import static org.hamcrest.MatcherAssert.assertThat; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; @@ -55,10 +59,12 @@ import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING; +import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT; import static org.neo4j.driver.v1.AccessMode.READ; import static org.neo4j.driver.v1.AccessMode.WRITE; @@ -71,8 +77,7 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale ConnectionPool conns = mock( ConnectionPool.class ); RoutingTable routingTable = mock( RoutingTable.class ); Rediscovery rediscovery = mock( Rediscovery.class ); - when( routingTable.isStale() ).thenReturn( true ); - Set set = Collections.singleton( new BoltServerAddress( "abc", 12 ) ); + Set set = singleton( new BoltServerAddress( "abc", 12 ) ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set ); // when @@ -81,29 +86,29 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale // then assertNotNull( balancer ); InOrder inOrder = inOrder( rediscovery, routingTable, conns ); - inOrder.verify( rediscovery ).lookupClusterComposition( conns, routingTable ); + inOrder.verify( rediscovery ).lookupClusterComposition( routingTable, conns ); inOrder.verify( routingTable ).update( any( ClusterComposition.class ) ); inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) ); } @Test - public void shouldEnsureRoutingOnInitialization() throws Exception + public void shouldRefreshRoutingTableOnInitialization() throws Exception { // given & when - final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 ); + final AtomicInteger refreshRoutingTableCounter = new AtomicInteger( 0 ); LoadBalancer balancer = new LoadBalancer( mock( ConnectionPool.class ), mock( RoutingTable.class ), mock( Rediscovery.class ), DEV_NULL_LOGGER ) { @Override - public void ensureRouting() + synchronized void refreshRoutingTable() { - ensureRoutingCounter.incrementAndGet(); + refreshRoutingTableCounter.incrementAndGet(); } }; // then assertNotNull( balancer ); - assertThat( ensureRoutingCounter.get(), equalTo( 1 ) ); + assertThat( refreshRoutingTableCounter.get(), equalTo( 1 ) ); } @Test @@ -120,7 +125,7 @@ public void shouldEnsureRoutingWhenAcquireConn() throws Exception connection.init( "Test", Collections.emptyMap() ); // then - verify( spy ).ensureRouting(); + verify( spy ).ensureRouting( READ ); verify( readConn ).init( "Test", Collections.emptyMap() ); } @@ -189,7 +194,104 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing verify( connectionPool ).purge( address ); } + @Test + public void shouldRediscoverOnReadWhenRoutingTableIsStaleForReads() + { + testRediscoveryWhenStale( READ ); + } + + @Test + public void shouldRediscoverOnWriteWhenRoutingTableIsStaleForWrites() + { + testRediscoveryWhenStale( WRITE ); + } + + @Test + public void shouldNotRediscoverOnReadWhenRoutingTableIsStaleForWritesButNotReads() + { + testNoRediscoveryWhenNotStale( WRITE, READ ); + } + + @Test + public void shouldNotRediscoverOnWriteWhenRoutingTableIsStaleForReadsButNotWrites() + { + testNoRediscoveryWhenNotStale( READ, WRITE ); + } + + @Test + public void shouldThrowWhenRediscoveryReturnsNoSuitableServers() + { + ConnectionPool connections = mock( ConnectionPool.class ); + RoutingTable routingTable = mock( RoutingTable.class ); + when( routingTable.isStaleFor( any( AccessMode.class ) ) ).thenReturn( true ); + Rediscovery rediscovery = mock( Rediscovery.class ); + when( routingTable.readers() ).thenReturn( new RoundRobinAddressSet() ); + when( routingTable.writers() ).thenReturn( new RoundRobinAddressSet() ); + + LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); + + try + { + loadBalancer.acquireConnection( READ ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + assertThat( e.getMessage(), startsWith( "Failed to obtain connection towards READ server" ) ); + } + + try + { + loadBalancer.acquireConnection( WRITE ); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + assertThat( e.getMessage(), startsWith( "Failed to obtain connection towards WRITE server" ) ); + } + } + + private void testRediscoveryWhenStale( AccessMode mode ) + { + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( LOCAL_DEFAULT ) ).thenReturn( mock( PooledConnection.class ) ); + + RoutingTable routingTable = newStaleRoutingTableMock( mode ); + Rediscovery rediscovery = newRediscoveryMock(); + + LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); + verify( rediscovery ).lookupClusterComposition( routingTable, connections ); + + assertNotNull( loadBalancer.acquireConnection( mode ) ); + verify( routingTable ).isStaleFor( mode ); + verify( rediscovery, times( 2 ) ).lookupClusterComposition( routingTable, connections ); + } + + private void testNoRediscoveryWhenNotStale( AccessMode staleMode, AccessMode notStaleMode ) + { + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( LOCAL_DEFAULT ) ).thenReturn( mock( PooledConnection.class ) ); + + RoutingTable routingTable = newStaleRoutingTableMock( staleMode ); + Rediscovery rediscovery = newRediscoveryMock(); + + LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); + verify( rediscovery ).lookupClusterComposition( routingTable, connections ); + + assertNotNull( loadBalancer.acquireConnection( notStaleMode ) ); + verify( routingTable ).isStaleFor( notStaleMode ); + verify( rediscovery ).lookupClusterComposition( routingTable, connections ); + } + private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConnection readConn ) + { + return setupLoadBalancer( writerConn, readConn, mock( Rediscovery.class ) ); + } + + private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConnection readConn, + Rediscovery rediscovery ) { BoltServerAddress writer = mock( BoltServerAddress.class ); BoltServerAddress reader = mock( BoltServerAddress.class ); @@ -208,7 +310,7 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne when( routingTable.readers() ).thenReturn( readerAddrs ); when( routingTable.writers() ).thenReturn( writerAddrs ); - return new LoadBalancer( connPool, routingTable, mock( Rediscovery.class ), DEV_NULL_LOGGER ); + return new LoadBalancer( connPool, routingTable, rediscovery, DEV_NULL_LOGGER ); } private static Session newSession( LoadBalancer loadBalancer ) @@ -228,4 +330,27 @@ private static PooledConnection newConnectionWithFailingSync( BoltServerAddress return connection; } + private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) + { + RoutingTable routingTable = mock( RoutingTable.class ); + when( routingTable.isStaleFor( mode ) ).thenReturn( true ); + when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( new HashSet() ); + + RoundRobinAddressSet addresses = new RoundRobinAddressSet(); + addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet() ); + when( routingTable.readers() ).thenReturn( addresses ); + when( routingTable.writers() ).thenReturn( addresses ); + + return routingTable; + } + + private static Rediscovery newRediscoveryMock() + { + Rediscovery rediscovery = mock( Rediscovery.class ); + Set noServers = Collections.emptySet(); + ClusterComposition clusterComposition = new ClusterComposition( 1, noServers, noServers, noServers ); + when( rediscovery.lookupClusterComposition( any( RoutingTable.class ), any( ConnectionPool.class ) ) ) + .thenReturn( clusterComposition ); + return rediscovery; + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index 1cada6bdc9..b204335145 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -36,6 +35,7 @@ import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.spi.PooledConnection; import org.neo4j.driver.internal.util.Clock; +import org.neo4j.driver.internal.util.FakeClock; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.exceptions.ProtocolException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -59,8 +59,8 @@ import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.C; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.D; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.E; +import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.EMPTY; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.F; -import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.INVALID_CLUSTER_COMPOSITION; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.VALID_CLUSTER_COMPOSITION; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.createClusterComposition; import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER; @@ -102,15 +102,14 @@ public void shouldTryConfiguredMaxRoutingFailures() throws Exception RoutingTable routingTable = new TestRoutingTable( A ); ClusterCompositionProvider mockedProvider = mock( ClusterCompositionProvider.class ); - when( mockedProvider.getClusterComposition( any( Connection.class ) ) ) - .thenReturn( success( INVALID_CLUSTER_COMPOSITION ) ); + when( mockedProvider.getClusterComposition( any( Connection.class ) ) ).thenThrow( new RuntimeException() ); Rediscovery rediscovery = new Rediscovery( A, settings, clock, DEV_NULL_LOGGER, mockedProvider, directMapProvider ); // when try { - rediscovery.lookupClusterComposition( mock( ConnectionPool.class ), routingTable ); + rediscovery.lookupClusterComposition( routingTable, mock( ConnectionPool.class ) ); fail("Should fail as failed to discovery"); } catch( ServiceUnavailableException e ) @@ -191,10 +190,10 @@ public static class NoWritersTest @Parameters(name = "Rediscovery result: {0}") public static Collection data() { return asList(new Object[][] { - { "([A], [C], [])", createClusterComposition( asList( A ), ClusterCompositionUtil.EMPTY, asList( C ) ) }, - { "([A], [CD], [])", createClusterComposition( asList( A ), ClusterCompositionUtil.EMPTY, asList( C, D ) ) }, - { "([AB], [C], [])", createClusterComposition( asList( A, B ), ClusterCompositionUtil.EMPTY, asList( C ) ) }, - { "([AB], [CD], [])", createClusterComposition( asList( A, B ), ClusterCompositionUtil.EMPTY, asList( C, D ) )} + {"([A], [C], [])", createClusterComposition( asList( A ), EMPTY, asList( C ) )}, + {"([A], [CD], [])", createClusterComposition( asList( A ), EMPTY, asList( C, D ) )}, + {"([AB], [C], [])", createClusterComposition( asList( A, B ), EMPTY, asList( C ) )}, + {"([AB], [CD], [])", createClusterComposition( asList( A, B ), EMPTY, asList( C, D ) )} }); } @@ -206,55 +205,54 @@ public NoWritersTest( String testName, ClusterComposition noWriters ) } @Test - public void shouldTryNextRouterWhenNoWriters() throws Throwable + public void shouldAcceptTableWithoutWriters() throws Throwable { // Given - RoutingTable routingTable = new TestRoutingTable( A, B ); + RoutingTable routingTable = new TestRoutingTable( A ); PooledConnection noWriterConn = mock( PooledConnection.class ); - PooledConnection healthyConn = mock( PooledConnection.class ); ConnectionPool mockedConnections = mock( ConnectionPool.class ); when( mockedConnections.acquire( A ) ).thenReturn( noWriterConn ); - when( mockedConnections.acquire( B ) ).thenReturn( healthyConn ); - ClusterCompositionProvider - mockedProvider = mock( ClusterCompositionProvider.class ); + ClusterCompositionProvider mockedProvider = mock( ClusterCompositionProvider.class ); when( mockedProvider.getClusterComposition( noWriterConn ) ).thenReturn( success( noWriters ) ); - when( mockedProvider.getClusterComposition( healthyConn ) ) - .thenReturn( success( VALID_CLUSTER_COMPOSITION ) ); // When ClusterComposition clusterComposition = rediscover( mockedConnections, routingTable, mockedProvider ); // Then - assertThat( clusterComposition, equalTo( VALID_CLUSTER_COMPOSITION ) ); + assertThat( clusterComposition, equalTo( noWriters ) ); } @Test - public void shouldThrowServiceUnavailableWhenNoNextRouter() throws Throwable + public void shouldUseInitialRouterWhenRediscoveringAfterNoWriters() throws Throwable { // Given - RoutingTable routingTable = new TestRoutingTable( A ); + RoutingTable routingTable = new TestRoutingTable( A, B, C ); PooledConnection noWriterConn = mock( PooledConnection.class ); + PooledConnection initialRouterConn = mock( PooledConnection.class ); ConnectionPool mockedConnections = mock( ConnectionPool.class ); when( mockedConnections.acquire( A ) ).thenReturn( noWriterConn ); + when( mockedConnections.acquire( B ) ).thenReturn( noWriterConn ); + when( mockedConnections.acquire( C ) ).thenReturn( noWriterConn ); + when( mockedConnections.acquire( F ) ).thenReturn( initialRouterConn ); - ClusterCompositionProvider - mockedProvider = mock( ClusterCompositionProvider.class ); + ClusterCompositionProvider mockedProvider = mock( ClusterCompositionProvider.class ); when( mockedProvider.getClusterComposition( noWriterConn ) ).thenReturn( success( noWriters ) ); + when( mockedProvider.getClusterComposition( initialRouterConn ) ) + .thenReturn( success( VALID_CLUSTER_COMPOSITION ) ); - // When & THen - try - { - rediscover( A, mockedConnections, routingTable, mockedProvider ); - fail( "Expecting a failure but not triggered." ); - } - catch( Exception e ) - { - assertThat( e, instanceOf( ServiceUnavailableException.class ) ); - assertThat( e.getMessage(), startsWith( "Could not perform discovery. No routing servers available." ) ); - } + Rediscovery rediscovery = new Rediscovery( F, new RoutingSettings( 1, 0 ), new FakeClock(), + DEV_NULL_LOGGER, mockedProvider, directMapProvider ); + + // first rediscovery should accept table with no writers + ClusterComposition composition1 = rediscovery.lookupClusterComposition( routingTable, mockedConnections ); + // second rediscovery should ask initial router because previous routing table had no writers + ClusterComposition composition2 = rediscovery.lookupClusterComposition( routingTable, mockedConnections ); + + assertEquals( noWriters, composition1 ); + assertEquals( VALID_CLUSTER_COMPOSITION, composition2 ); } } @@ -307,7 +305,7 @@ public void shouldUpdateRoutingTableWithTheNewOne() throws Throwable public static class IllegalResponseTest { @Test - public void shouldProtocolErrorWhenFailedToPaseClusterCompositin() throws Throwable + public void shouldProtocolErrorWhenFailedToParseClusterComposition() throws Throwable { // Given RoutingTable routingTable = new TestRoutingTable( A ); @@ -468,7 +466,7 @@ private static ClusterComposition rediscover( BoltServerAddress initialRouter, C Rediscovery rediscovery = new Rediscovery( initialRouter, settings, mockedClock, mockedLogger, provider, directMapProvider ); - return rediscovery.lookupClusterComposition( connections, routingTable ); + return rediscovery.lookupClusterComposition( routingTable, connections ); } private static class TestRoutingTable extends ClusterRoutingTable diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java index 2dc406a18d..d559060bd7 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingPooledConnectionErrorHandlingTest.java @@ -97,9 +97,10 @@ public void shouldHandleServiceUnavailableException() { ServiceUnavailableException serviceUnavailable = new ServiceUnavailableException( "Oh!" ); Connector connector = newConnectorWithThrowingConnections( serviceUnavailable ); - RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ClusterComposition clusterComposition = newClusterComposition( ADDRESS1, ADDRESS2, ADDRESS3 ); + RoutingTable routingTable = newRoutingTable( clusterComposition ); ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); - LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + LoadBalancer loadBalancer = newLoadBalancer( clusterComposition, routingTable, connectionPool ); Connection readConnection = loadBalancer.acquireConnection( READ ); verifyServiceUnavailableHandling( readConnection, routingTable, connectionPool ); @@ -141,9 +142,10 @@ public void shouldPropagateClientExceptionWithoutErrorCode() private void testHandleFailureToWriteWithWriteConnection( ClientException error ) { Connector connector = newConnectorWithThrowingConnections( error ); - RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ClusterComposition clusterComposition = newClusterComposition( ADDRESS1, ADDRESS2, ADDRESS3 ); + RoutingTable routingTable = newRoutingTable( clusterComposition ); ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); - LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + LoadBalancer loadBalancer = newLoadBalancer( clusterComposition, routingTable, connectionPool ); Connection readConnection = loadBalancer.acquireConnection( READ ); try @@ -169,9 +171,10 @@ private void testHandleFailureToWriteWithWriteConnection( ClientException error private void testHandleFailureToWrite( ClientException error ) { Connector connector = newConnectorWithThrowingConnections( error ); - RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ClusterComposition clusterComposition = newClusterComposition( ADDRESS1, ADDRESS2, ADDRESS3 ); + RoutingTable routingTable = newRoutingTable( clusterComposition ); ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); - LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + LoadBalancer loadBalancer = newLoadBalancer( clusterComposition, routingTable, connectionPool ); Connection readConnection = loadBalancer.acquireConnection( WRITE ); try @@ -197,9 +200,10 @@ private void testHandleFailureToWrite( ClientException error ) private void testThrowablePropagation( Throwable error ) { Connector connector = newConnectorWithThrowingConnections( error ); - RoutingTable routingTable = newRoutingTable( ADDRESS1, ADDRESS2, ADDRESS3 ); + ClusterComposition clusterComposition = newClusterComposition( ADDRESS1, ADDRESS2, ADDRESS3 ); + RoutingTable routingTable = newRoutingTable( clusterComposition ); ConnectionPool connectionPool = newConnectionPool( connector, ADDRESS1, ADDRESS2, ADDRESS3 ); - LoadBalancer loadBalancer = newLoadBalancer( routingTable, connectionPool ); + LoadBalancer loadBalancer = newLoadBalancer( clusterComposition, routingTable, connectionPool ); Connection readConnection = loadBalancer.acquireConnection( READ ); verifyThrowablePropagation( readConnection, routingTable, connectionPool, error.getClass() ); @@ -276,17 +280,19 @@ private static Connection newConnectionMock( BoltServerAddress address ) return connection; } - private static RoutingTable newRoutingTable( BoltServerAddress... addresses ) + private static ClusterComposition newClusterComposition( BoltServerAddress... addresses ) { - ClusterComposition clusterComposition = new ClusterComposition( + return new ClusterComposition( Long.MAX_VALUE, new HashSet<>( asList( addresses ) ), new HashSet<>( asList( addresses ) ), new HashSet<>( asList( addresses ) ) ); + } + private static RoutingTable newRoutingTable( ClusterComposition clusterComposition ) + { RoutingTable routingTable = new ClusterRoutingTable( Clock.SYSTEM ); routingTable.update( clusterComposition ); - return routingTable; } @@ -313,9 +319,12 @@ private static ConnectionPool newConnectionPool( Connector connector, BoltServer return pool; } - private static LoadBalancer newLoadBalancer( RoutingTable routingTable, ConnectionPool connectionPool ) + private static LoadBalancer newLoadBalancer( ClusterComposition clusterComposition, RoutingTable routingTable, + ConnectionPool connectionPool ) { - return new LoadBalancer( connectionPool, routingTable, mock( Rediscovery.class ), DEV_NULL_LOGGER ); + Rediscovery rediscovery = mock( Rediscovery.class ); + when( rediscovery.lookupClusterComposition( routingTable, connectionPool ) ).thenReturn( clusterComposition ); + return new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER ); } private interface ConnectionMethod diff --git a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java index 5b54cfb42a..0b502985e7 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java +++ b/driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java @@ -44,7 +44,9 @@ import org.neo4j.driver.v1.Logging; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; import org.neo4j.driver.v1.Transaction; +import org.neo4j.driver.v1.TransactionWork; import org.neo4j.driver.v1.Values; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; @@ -53,6 +55,7 @@ import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.cc.Cluster; import org.neo4j.driver.v1.util.cc.ClusterMember; +import org.neo4j.driver.v1.util.cc.ClusterMemberRole; import org.neo4j.driver.v1.util.cc.ClusterRule; import static org.hamcrest.Matchers.containsString; @@ -343,6 +346,98 @@ public String apply( Session session ) } } + @Test + public void shouldNotServeWritesWhenMajorityOfCoresAreDead() throws Exception + { + Cluster cluster = clusterRule.getCluster(); + ClusterMember leader = cluster.leader(); + + try ( Driver driver = createDriver( leader.getRoutingUri() ) ) + { + for ( ClusterMember follower : cluster.followers() ) + { + cluster.kill( follower ); + } + awaitLeaderToStepDown( driver ); + + // now we should be unable to write because majority of cores is down + for ( int i = 0; i < 10; i++ ) + { + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + session.run( "CREATE (p:Person {name: 'Gamora'})" ).consume(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + } + } + } + } + + @Test + public void shouldServeReadsWhenMajorityOfCoresAreDead() throws Exception + { + Cluster cluster = clusterRule.getCluster(); + ClusterMember leader = cluster.leader(); + + try ( Driver driver = createDriver( leader.getRoutingUri() ) ) + { + String bookmark; + try ( Session session = driver.session() ) + { + int writeResult = session.writeTransaction( new TransactionWork() + { + @Override + public Integer execute( Transaction tx ) + { + StatementResult result = tx.run( "CREATE (:Person {name: 'Star Lord'}) RETURN 42" ); + return result.single().get( 0 ).asInt(); + } + } ); + + assertEquals( 42, writeResult ); + bookmark = session.lastBookmark(); + } + + ensureNodeVisible( cluster, "Star Lord", bookmark ); + + for ( ClusterMember follower : cluster.followers() ) + { + cluster.kill( follower ); + } + awaitLeaderToStepDown( driver ); + + // now we should be unable to write because majority of cores is down + try ( Session session = driver.session( AccessMode.WRITE ) ) + { + session.run( "CREATE (p:Person {name: 'Gamora'})" ).consume(); + fail( "Exception expected" ); + } + catch ( Exception e ) + { + assertThat( e, instanceOf( SessionExpiredException.class ) ); + } + + // but we should be able to read from the remaining core or read replicas + try ( Session session = driver.session() ) + { + int count = session.readTransaction( new TransactionWork() + { + @Override + public Integer execute( Transaction tx ) + { + StatementResult result = tx.run( "MATCH (:Person {name: 'Star Lord'}) RETURN count(*)" ); + return result.single().get( 0 ).asInt(); + } + } ); + + assertEquals( 1, count ); + } + } + } + private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException { try ( Driver driver = createDriver( member.getRoutingUri() ) ) @@ -412,6 +507,73 @@ private T inExpirableSession( Driver driver, Function acquir throw new TimeoutException( "Transaction did not succeed in time" ); } + private void ensureNodeVisible( Cluster cluster, String name, String bookmark ) + { + for ( ClusterMember member : cluster.members() ) + { + int count = countNodesUsingDirectDriver( member.getBoltUri(), name, bookmark ); + assertEquals( 1, count ); + } + } + + private int countNodesUsingDirectDriver( URI boltUri, final String name, String bookmark ) + { + try ( Driver driver = createDriver( boltUri ); + Session session = driver.session( bookmark ) ) + { + return session.readTransaction( new TransactionWork() + { + @Override + public Integer execute( Transaction tx ) + { + StatementResult result = tx.run( "MATCH (:Person {name: {name}}) RETURN count(*)", + parameters( "name", name ) ); + return result.single().get( 0 ).asInt(); + } + } ); + } + } + + private void awaitLeaderToStepDown( Driver driver ) + { + int leadersCount; + int followersCount; + int readReplicasCount; + do + { + try ( Session session = driver.session() ) + { + int newLeadersCount = 0; + int newFollowersCount = 0; + int newReadReplicasCount = 0; + for ( Record record : session.run( "CALL dbms.cluster.overview" ).list() ) + { + ClusterMemberRole role = ClusterMemberRole.valueOf( record.get( "role" ).asString() ); + if ( role == ClusterMemberRole.LEADER ) + { + newLeadersCount++; + } + else if ( role == ClusterMemberRole.FOLLOWER ) + { + newFollowersCount++; + } + else if ( role == ClusterMemberRole.READ_REPLICA ) + { + newReadReplicasCount++; + } + else + { + throw new AssertionError( "Unknown role: " + role ); + } + } + leadersCount = newLeadersCount; + followersCount = newFollowersCount; + readReplicasCount = newReadReplicasCount; + } + } + while ( !(leadersCount == 0 && followersCount == 1 && readReplicasCount == 2) ); + } + private Driver createDriver( URI boltUri ) { Logging devNullLogging = new Logging() diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java index 2accfaa1c5..a578a44b7e 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java @@ -39,6 +39,7 @@ import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; +import static java.util.Collections.unmodifiableSet; import static org.neo4j.driver.internal.util.Iterables.single; import static org.neo4j.driver.v1.Config.TrustStrategy.trustAllCertificates; @@ -101,6 +102,11 @@ public ClusterMember leaderTx( Consumer tx ) return leader; } + public Set members() + { + return unmodifiableSet( members ); + } + public ClusterMember leader() { Set leaders = membersWithRole( ClusterMemberRole.LEADER ); @@ -139,7 +145,9 @@ public void start( ClusterMember member ) public void startOfflineMembers() { - for ( ClusterMember member : offlineMembers ) + // copy offline members to avoid ConcurrentModificationException + Set currentlyOfflineMembers = new HashSet<>( offlineMembers ); + for ( ClusterMember member : currentlyOfflineMembers ) { startNoWait( member ); } diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java index 58b21d2762..39ebf46316 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java @@ -87,6 +87,8 @@ protected void before() throws Throwable addShutdownHookToStopCluster(); } } + + getCluster().deleteData(); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/v1/util/cc/SharedCluster.java b/driver/src/test/java/org/neo4j/driver/v1/util/cc/SharedCluster.java index 33e7f4f711..ea48ddd11b 100644 --- a/driver/src/test/java/org/neo4j/driver/v1/util/cc/SharedCluster.java +++ b/driver/src/test/java/org/neo4j/driver/v1/util/cc/SharedCluster.java @@ -19,6 +19,7 @@ package org.neo4j.driver.v1.util.cc; import java.net.URI; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.HashSet; @@ -55,7 +56,7 @@ static boolean exists() static void install( String neo4jVersion, int cores, int readReplicas, String password, int port, Path path ) { assertClusterDoesNotExist(); - if( path.toFile().exists() ) + if ( Files.isDirectory( path ) ) { debug( "Found and using cluster installed at `%s`.", path ); } diff --git a/driver/src/test/resources/discover_no_writers.script b/driver/src/test/resources/discover_no_writers.script new file mode 100644 index 0000000000..20f69dde4e --- /dev/null +++ b/driver/src/test/resources/discover_no_writers.script @@ -0,0 +1,9 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9004","127.0.0.1:9005"], "role": "ROUTE"}]] + SUCCESS {} diff --git a/driver/src/test/resources/rediscover_using_initial_router.script b/driver/src/test/resources/rediscover_using_initial_router.script index 286e7e898f..d91d3e5101 100644 --- a/driver/src/test/resources/rediscover_using_initial_router.script +++ b/driver/src/test/resources/rediscover_using_initial_router.script @@ -1,6 +1,9 @@ !: AUTO INIT !: AUTO RESET !: AUTO PULL_ALL +!: AUTO RUN "BEGIN" {} +!: AUTO RUN "COMMIT" {} +!: AUTO RUN "ROLLBACK" {} C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL