diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java index c8945544e6..746c1f7404 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusterDriver.java @@ -20,12 +20,16 @@ import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; +import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.ConcurrentRoundRobinSet; import org.neo4j.driver.internal.util.Consumer; import org.neo4j.driver.v1.AccessMode; @@ -34,23 +38,26 @@ import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Value; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.util.BiFunction; +import org.neo4j.driver.v1.util.Function; import static java.lang.String.format; public class ClusterDriver extends BaseDriver { private static final String GET_SERVERS = "dbms.cluster.routing.getServers"; + private static final long MAX_TTL = Long.MAX_VALUE / 1000L; private final static Comparator COMPARATOR = new Comparator() { @Override public int compare( BoltServerAddress o1, BoltServerAddress o2 ) { int compare = o1.host().compareTo( o2.host() ); - if (compare == 0) + if ( compare == 0 ) { compare = Integer.compare( o1.port(), o2.port() ); } @@ -58,24 +65,28 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 ) return compare; } }; - private static final int MIN_SERVERS = 2; + private static final int MIN_SERVERS = 1; private final ConnectionPool connections; - private final BiFunction sessionProvider; - - private final ConcurrentRoundRobinSet routingServers = new ConcurrentRoundRobinSet<>(COMPARATOR); - private final ConcurrentRoundRobinSet readServers = new ConcurrentRoundRobinSet<>(COMPARATOR); - private final ConcurrentRoundRobinSet writeServers = new ConcurrentRoundRobinSet<>(COMPARATOR); + private final BiFunction sessionProvider; + private final Clock clock; + private final ConcurrentRoundRobinSet routingServers = + new ConcurrentRoundRobinSet<>( COMPARATOR ); + private final ConcurrentRoundRobinSet readServers = new ConcurrentRoundRobinSet<>( COMPARATOR ); + private final ConcurrentRoundRobinSet writeServers = new ConcurrentRoundRobinSet<>( COMPARATOR ); + private final AtomicLong expires = new AtomicLong( 0L ); public ClusterDriver( BoltServerAddress seedAddress, ConnectionPool connections, SecurityPlan securityPlan, - BiFunction sessionProvider, + BiFunction sessionProvider, + Clock clock, Logging logging ) { super( securityPlan, logging ); routingServers.add( seedAddress ); this.connections = connections; this.sessionProvider = sessionProvider; + this.clock = clock; checkServers(); } @@ -83,15 +94,42 @@ private void checkServers() { synchronized ( routingServers ) { - if ( routingServers.size() < MIN_SERVERS || + if ( expires.get() < clock.millis() || + routingServers.size() < MIN_SERVERS || readServers.isEmpty() || - writeServers.isEmpty()) + writeServers.isEmpty() ) { getServers(); } } } + private Set forgetAllServers() + { + final Set seen = new HashSet<>(); + seen.addAll( routingServers ); + seen.addAll( readServers ); + seen.addAll( writeServers ); + routingServers.clear(); + readServers.clear(); + writeServers.clear(); + return seen; + } + + private long calculateNewExpiry( Record record ) + { + long ttl = record.get( "ttl" ).asLong(); + long nextExpiry = clock.millis() + 1000L * ttl; + if ( ttl < 0 || ttl >= MAX_TTL || nextExpiry < 0 ) + { + return Long.MAX_VALUE; + } + else + { + return nextExpiry; + } + } + //must be called from a synchronized block private void getServers() { @@ -99,26 +137,34 @@ private void getServers() try { boolean success = false; - while ( !routingServers.isEmpty() && !success ) + + ConcurrentRoundRobinSet routers = new ConcurrentRoundRobinSet<>( routingServers ); + final Set seen = forgetAllServers(); + while ( !routers.isEmpty() && !success ) { - address = routingServers.hop(); + address = routers.hop(); success = call( address, GET_SERVERS, new Consumer() { @Override public void accept( Record record ) { - BoltServerAddress newAddress = new BoltServerAddress( record.get( "address" ).asString() ); - switch ( record.get( "mode" ).asString().toUpperCase() ) + expires.set( calculateNewExpiry( record ) ); + List servers = servers( record ); + for ( ServerInfo server : servers ) { - case "READ": - readServers.add( newAddress ); - break; - case "WRITE": - writeServers.add( newAddress ); - break; - case "ROUTE": - routingServers.add( newAddress ); - break; + seen.removeAll( server.addresses() ); + switch ( server.role() ) + { + case "READ": + readServers.addAll( server.addresses() ); + break; + case "WRITE": + writeServers.addAll( server.addresses() ); + break; + case "ROUTE": + routingServers.addAll( server.addresses() ); + break; + } } } } ); @@ -127,6 +173,12 @@ public void accept( Record record ) { throw new ServiceUnavailableException( "Run out of servers" ); } + + //the server no longer think we should care about these + for ( BoltServerAddress remove : seen ) + { + connections.purge( remove ); + } } catch ( ClientException ex ) { @@ -137,7 +189,7 @@ public void accept( Record record ) this.close(); throw new ServiceUnavailableException( String.format( "Server %s couldn't perform discovery", - address == null ? "`UNKNOWN`" : address.toString()), ex ); + address == null ? "`UNKNOWN`" : address.toString() ), ex ); } else { @@ -146,6 +198,47 @@ public void accept( Record record ) } } + private static class ServerInfo + { + private final List addresses; + private final String role; + + public ServerInfo( List addresses, String role ) + { + this.addresses = addresses; + this.role = role; + } + + public String role() + { + return role; + } + + List addresses() + { + return addresses; + } + } + + private List servers( Record record ) + { + return record.get( "servers" ).asList( new Function() + { + @Override + public ServerInfo apply( Value value ) + { + return new ServerInfo( value.get( "addresses" ).asList( new Function() + { + @Override + public BoltServerAddress apply( Value value ) + { + return new BoltServerAddress( value.asString() ); + } + } ), value.get( "role" ).asString() ); + } + } ); + } + //must be called from a synchronized method private boolean call( BoltServerAddress address, String procedureName, Consumer recorder ) { @@ -153,7 +246,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer< Session session = null; try { - acquire = connections.acquire(address); + acquire = connections.acquire( address ); session = sessionProvider.apply( acquire, log ); StatementResult records = session.run( format( "CALL %s", procedureName ) ); @@ -217,19 +310,19 @@ public void onWriteFailure( BoltServerAddress address ) log ); } - private Connection acquireConnection( AccessMode mode ) + private Connection acquireConnection( AccessMode role ) { //Potentially rediscover servers if we are not happy with our current knowledge checkServers(); - switch ( mode ) + switch ( role ) { case READ: return connections.acquire( readServers.hop() ); case WRITE: return connections.acquire( writeServers.hop() ); default: - throw new ClientException( mode + " is not supported for creating new sessions" ); + throw new ClientException( role + " is not supported for creating new sessions" ); } } @@ -255,13 +348,13 @@ Set routingServers() //For testing Set readServers() { - return Collections.unmodifiableSet(readServers); + return Collections.unmodifiableSet( readServers ); } //For testing Set writeServers() { - return Collections.unmodifiableSet( writeServers); + return Collections.unmodifiableSet( writeServers ); } //For testing diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java index 2c3d21f504..7c69fdeec5 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusteredNetworkSession.java @@ -53,7 +53,7 @@ public StatementResult run( Statement statement ) } catch ( ClientException e ) { - if ( e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ) ) + if ( e.code().equals( "Neo.ClientError.Cluster.NotALeader" ) ) { onError.onWriteFailure( connection.address() ); throw new SessionExpiredException( diff --git a/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java index af80164d70..9faa992115 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/ClusteredStatementResult.java @@ -256,6 +256,6 @@ private SessionExpiredException failedWrite() private boolean isFailedToWrite( ClientException e ) { - return e.code().equals( "Neo.ClientError.General.ForbiddenOnFollower" ); + return e.code().equals( "Neo.ClientError.Cluster.NotALeader" ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java index cfbd1ff827..a4514d0c62 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java @@ -18,14 +18,11 @@ */ package org.neo4j.driver.internal.net.pooling; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.spi.Collector; import org.neo4j.driver.internal.util.Consumer; -import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.util.Function; /** * The responsibility of the PooledConnectionReleaseConsumer is to release valid connections @@ -34,16 +31,15 @@ class PooledConnectionReleaseConsumer implements Consumer { private final BlockingQueue connections; - private final long minIdleBeforeConnectionTest; - private static final Map NO_PARAMETERS = new HashMap<>(); private final AtomicBoolean driverStopped; + private final Function validConnection; PooledConnectionReleaseConsumer( BlockingQueue connections, AtomicBoolean driverStopped, - PoolSettings poolSettings) + Function validConnection) { this.connections = connections; this.driverStopped = driverStopped; - this.minIdleBeforeConnectionTest = poolSettings.idleTimeBeforeConnectionTest(); + this.validConnection = validConnection; } @Override @@ -54,7 +50,7 @@ public void accept( PooledConnection pooledConnection ) // if the driver already closed, then no need to try to return to pool, just directly close this connection pooledConnection.dispose(); } - else if ( validConnection( pooledConnection ) ) + else if ( validConnection.apply( pooledConnection ) ) { boolean released = connections.offer( pooledConnection ); if( !released ) @@ -83,48 +79,4 @@ else if ( driverStopped.get() ) pooledConnection.dispose(); } } - - boolean validConnection( PooledConnection pooledConnection ) - { - // once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error - // and we should close the conn without bothering to reset the conn at all - return !pooledConnection.hasUnrecoverableErrors() && - reset(pooledConnection) && - (pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection )); - } - - /** - * In case this session has an open result or transaction or something, - * make sure it's reset to a nice state before we reuse it. - * @param conn the PooledConnection - * @return true if the connection is reset successfully without any error, otherwise false. - */ - private boolean reset( PooledConnection conn ) - { - try - { - conn.reset(); - conn.sync(); - return true; - } - catch ( Throwable e ) - { - return false; - } - } - - private boolean ping( PooledConnection conn ) - { - try - { - conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP ); - conn.pullAll( Collector.NO_OP ); - conn.sync(); - return true; - } - catch ( Throwable e ) - { - return false; - } - } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java new file mode 100644 index 0000000000..bc4cf3ad8a --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionValidator.java @@ -0,0 +1,88 @@ +/** + * Copyright (c) 2002-2016 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.net.pooling; + +import java.util.HashMap; +import java.util.Map; + +import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.v1.Value; +import org.neo4j.driver.v1.util.Function; + +class PooledConnectionValidator implements Function +{ + private final ConnectionPool pool; + private final PoolSettings poolSettings; + private static final Map NO_PARAMETERS = new HashMap<>(); + + PooledConnectionValidator( ConnectionPool pool, PoolSettings poolSettings ) + { + this.pool = pool; + this.poolSettings = poolSettings; + } + + @Override + public Boolean apply( PooledConnection pooledConnection ) + { + // once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error + // and we should close the conn without bothering to reset the conn at all + return pool.hasAddress( pooledConnection.address() ) && + !pooledConnection.hasUnrecoverableErrors() && + reset( pooledConnection ) && + (pooledConnection.idleTime() <= poolSettings.idleTimeBeforeConnectionTest() || + ping( pooledConnection )); + } + + /** + * In case this session has an open result or transaction or something, + * make sure it's reset to a nice state before we reuse it. + * + * @param conn the PooledConnection + * @return true if the connection is reset successfully without any error, otherwise false. + */ + private boolean reset( PooledConnection conn ) + { + try + { + conn.reset(); + conn.sync(); + return true; + } + catch ( Throwable e ) + { + return false; + } + } + + private boolean ping( PooledConnection conn ) + { + try + { + conn.run( "RETURN 1 // JavaDriver poll to test connection", NO_PARAMETERS, Collector.NO_OP ); + conn.pullAll( Collector.NO_OP ); + conn.sync(); + return true; + } + catch ( Throwable e ) + { + return false; + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java index 722f4e747d..ceb62b6814 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java +++ b/driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java @@ -113,7 +113,7 @@ public Connection acquire( BoltServerAddress address ) if ( conn == null ) { conn = new PooledConnection( connect( address ), new - PooledConnectionReleaseConsumer( connections, stopped, poolSettings ), clock ); + PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock ); } conn.updateUsageTimestamp(); return conn; diff --git a/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java b/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java index 666e00e524..ffd369c248 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/util/ConcurrentRoundRobinSet.java @@ -43,6 +43,12 @@ public ConcurrentRoundRobinSet( Comparator comparator ) set = new ConcurrentSkipListSet<>( comparator ); } + public ConcurrentRoundRobinSet(ConcurrentRoundRobinSet original) + { + set = new ConcurrentSkipListSet<>( original.set.comparator() ); + set.addAll( original ); + } + public T hop() { if ( current == null ) diff --git a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java index 8362fd3d45..14d026cde0 100644 --- a/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java +++ b/driver/src/main/java/org/neo4j/driver/v1/GraphDatabase.java @@ -32,6 +32,7 @@ import org.neo4j.driver.internal.security.SecurityPlan; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.exceptions.ClientException; import org.neo4j.driver.v1.util.BiFunction; @@ -189,7 +190,7 @@ public static Driver driver( URI uri, AuthToken authToken, Config config ) case "bolt": return new DirectDriver( address, connectionPool, securityPlan, config.logging() ); case "bolt+routing": - return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, config.logging() ); + return new ClusterDriver( address, connectionPool, securityPlan, SESSION_PROVIDER, Clock.SYSTEM, config.logging() ); default: throw new ClientException( format( "Unsupported URI scheme: %s", scheme ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java index 28ad7b9f4f..e3856bad6f 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverStubTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.logging.ConsoleLogging; import org.neo4j.driver.internal.net.BoltServerAddress; +import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Config; import org.neo4j.driver.v1.GraphDatabase; @@ -46,13 +47,17 @@ import org.neo4j.driver.v1.util.Function; import org.neo4j.driver.v1.util.StubServer; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + @Ignore public class ClusterDriverStubTest { @@ -73,10 +78,7 @@ public void shouldDiscoverServers() throws IOException, InterruptedException, St { // Then Set addresses = driver.routingServers(); - assertThat( addresses, hasSize( 3 ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); + assertThat( addresses, containsInAnyOrder( address(9001), address( 9002 ), address( 9003 ) ) ); } // Finally @@ -89,23 +91,21 @@ public void shouldDiscoverNewServers() throws IOException, InterruptedException, // Given StubServer server = StubServer.start( resource( "discover_new_servers.script" ), 9001 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + BoltServerAddress seed = address( 9001 ); // When try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) { // Then Set addresses = driver.routingServers(); - assertThat( addresses, hasSize( 4 ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) ) ); - assertThat( addresses, hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) ) ); + assertThat( addresses, containsInAnyOrder( address(9002), address( 9003 ), address( 9004 ) ) ); } // Finally assertThat( server.exitStatus(), equalTo( 0 ) ); } + @Test public void shouldHandleEmptyResponse() throws IOException, InterruptedException, StubServer.ForceKilled { @@ -115,8 +115,8 @@ public void shouldHandleEmptyResponse() throws IOException, InterruptedException try ( ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ) ) { Set servers = driver.routingServers(); - assertThat( servers, hasSize( 1 ) ); - assertThat( servers, hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) ) ); + assertThat( servers, hasSize( 0 ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9001 ) ) ); } // Finally @@ -265,7 +265,7 @@ public void shouldRoundRobinWriteSessions() throws IOException, InterruptedExcep { for ( int i = 0; i < 2; i++ ) { - try(Session session = driver.session() ) + try ( Session session = driver.session() ) { session.run( "CREATE (n {name:'Bob'})" ); } @@ -291,15 +291,9 @@ public void shouldRememberEndpoints() throws IOException, InterruptedException, { session.run( "MATCH (n) RETURN n.name" ).consume(); - assertThat( driver.readServers(), hasSize( 2 )); - assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) ); - assertThat( driver.readServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9006 ) ) ); - assertThat( driver.writeServers(), hasSize( 2 )); - assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ); - assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) ); - //Make sure we don't cache acquired servers as discovery servers - assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 )))); - assertThat( driver.routingServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9006 )))); + assertThat( driver.readServers(), containsInAnyOrder( address( 9005 ), address( 9006 ) ) ); + assertThat( driver.writeServers(), containsInAnyOrder( address( 9007 ), address( 9008 ) ) ); + assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ), address( 9002 ), address( 9003 ) ) ); } // Finally assertThat( server.exitStatus(), equalTo( 0 ) ); @@ -316,21 +310,21 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce StubServer.start( resource( "dead_server.script" ), 9005 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); - boolean failed = false; try { Session session = driver.session( AccessMode.READ ); session.run( "MATCH (n) RETURN n.name" ).consume(); session.close(); + fail(); } catch ( SessionExpiredException e ) { - failed = true; + //ignore } - assertTrue( failed ); - assertThat( driver.readServers(), not(hasItem( new BoltServerAddress( "127.0.0.1", 9005 ) ) )); + assertThat( driver.readServers(), not( hasItem( address( 9005 ) ) ) ); assertThat( driver.writeServers(), hasSize( 2 ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) ); driver.close(); // Finally @@ -338,33 +332,33 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce } @Test - public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled + public void shouldRediscoverIfNecessaryOnSessionAcquisition() + throws IOException, InterruptedException, StubServer.ForceKilled { // Given StubServer server = StubServer.start( resource( "rediscover.script" ), 9001 ); URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); //START a read server - StubServer.start( resource( "read_server.script" ), 9005 ); + StubServer read = StubServer.start( resource( "empty.script" ), 9005 ); //On creation we only find ourselves ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); - assertThat( driver.routingServers(), hasSize( 1 ) ); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ) ) ); + assertTrue( driver.connectionPool().hasAddress( address( 9001 ) ) ); - //since we know about less than three servers a rediscover should be triggered + //since we have no write nor read servers we must rediscover Session session = driver.session( AccessMode.READ ); - assertThat( driver.routingServers(), hasSize( 4 ) ); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) )); - + assertThat( driver.routingServers(), containsInAnyOrder(address( 9002 ), + address( 9003 ), address( 9004 ) ) ); + //server told os to forget 9001 + assertFalse( driver.connectionPool().hasAddress( address( 9001 ) ) ); session.close(); driver.close(); // Finally assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( read.exitStatus(), equalTo( 0 ) ); } @Test @@ -375,12 +369,11 @@ public void shouldOnlyGetServersOnce() throws IOException, InterruptedException, URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); //START a read server - StubServer.start( resource( "read_server.script" ), 9005 ); + StubServer read = StubServer.start( resource( "empty.script" ), 9005 ); //On creation we only find ourselves final ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); - assertThat( driver.routingServers(), hasSize( 1 ) ); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); + assertThat( driver.routingServers(), containsInAnyOrder( address( 9001 ) ) ); ExecutorService runner = Executors.newFixedThreadPool( 10 ); for ( int i = 0; i < 10; i++ ) @@ -391,7 +384,7 @@ public void shouldOnlyGetServersOnce() throws IOException, InterruptedException, public void run() { //noinspection EmptyTryBlock - try(Session ignore = driver.session( AccessMode.READ )) + try ( Session ignore = driver.session( AccessMode.READ ) ) { //empty } @@ -401,16 +394,13 @@ public void run() } runner.awaitTermination( 10, TimeUnit.SECONDS ); //since we know about less than three servers a rediscover should be triggered - assertThat( driver.routingServers(), hasSize( 4 ) ); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9001 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9002 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9003 ) )); - assertThat( driver.routingServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9004 ) )); + assertThat( driver.routingServers(), containsInAnyOrder( address( 9002 ), address( 9003 ), address( 9004 ) ) ); driver.close(); // Finally assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( read.exitStatus(), equalTo( 0 ) ); } @Test @@ -450,23 +440,107 @@ public void shouldHandleLeaderSwitchWhenWriting() boolean failed = false; try ( Session session = driver.session( AccessMode.WRITE ) ) { - assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) )); - assertThat(driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) )); + assertThat( driver.writeServers(), hasItem(address( 9007 ) ) ); + assertThat( driver.writeServers(), hasItem( address( 9008 ) ) ); session.run( "CREATE ()" ).consume(); } - catch (SessionExpiredException e) + catch ( SessionExpiredException e ) { failed = true; - assertThat(e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" )); + assertThat( e.getMessage(), equalTo( "Server at 127.0.0.1:9007 no longer accepts writes" ) ); } assertTrue( failed ); - assertThat( driver.writeServers(), not( hasItem( new BoltServerAddress( "127.0.0.1", 9007 ) ) ) ); - assertThat( driver.writeServers(), hasItem( new BoltServerAddress( "127.0.0.1", 9008 ) ) ); - assertTrue( driver.connectionPool().hasAddress( new BoltServerAddress( "127.0.0.1", 9007 ) ) ); + assertThat( driver.writeServers(), not( hasItem( address( 9007 ) ) ) ); + assertThat( driver.writeServers(), hasItem( address( 9008 ) ) ); + assertTrue( driver.connectionPool().hasAddress( address( 9007 ) ) ); + + driver.close(); + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Test + public void shouldRediscoverOnExpiry() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "expire.script" ), 9001 ); + + //START a read server + StubServer readServer = StubServer.start( resource( "empty.script" ), 9005 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + assertThat(driver.routingServers(), contains(address( 9001 ))); + assertThat(driver.readServers(), contains(address( 9002 ))); + assertThat(driver.writeServers(), contains(address( 9003 ))); + //On acquisition we should update our view + Session session = driver.session( AccessMode.READ ); + assertThat(driver.routingServers(), contains(address( 9004 ))); + assertThat(driver.readServers(), contains(address( 9005 ))); + assertThat(driver.writeServers(), contains(address( 9006 ))); + session.close(); driver.close(); // Finally assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + } + + @Test + public void shouldNotPutBackPurgedConnection() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "not_reuse_connection.script" ), 9001 ); + + //START servers + StubServer readServer = StubServer.start( resource( "empty.script" ), 9002 ); + StubServer writeServer1 = StubServer.start( resource( "dead_server.script" ), 9003 ); + StubServer writeServer2 = StubServer.start( resource( "empty.script" ), 9006 ); + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + + ClusterDriver driver = (ClusterDriver) GraphDatabase.driver( uri, config ); + + + //Open both a read and a write session + Session readSession = driver.session( AccessMode.READ ); + Session writeSession = driver.session( AccessMode.WRITE ); + + try + { + writeSession.run( "MATCH (n) RETURN n.name" ); + writeSession.close(); + fail(); + } + catch (SessionExpiredException e) + { + //ignore + } + //We now lost all write servers + assertThat(driver.writeServers(), hasSize( 0 )); + + //reacquiring will trow out the current read server at 9002 + writeSession = driver.session( AccessMode.WRITE ); + + assertThat(driver.routingServers(), contains(address( 9004 ))); + assertThat(driver.readServers(), contains(address( 9005 ))); + assertThat(driver.writeServers(), contains(address( 9006 ))); + assertFalse(driver.connectionPool().hasAddress(address( 9002 ) )); + + // now we close the read session and the connection should not be put + // back to the pool + Connection connection = ((ClusteredNetworkSession) readSession).connection; + assertTrue( connection.isOpen() ); + readSession.close(); + assertFalse( connection.isOpen() ); + assertFalse(driver.connectionPool().hasAddress(address( 9002 ) )); + writeSession.close(); + + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + assertThat( readServer.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer1.exitStatus(), equalTo( 0 ) ); + assertThat( writeServer2.exitStatus(), equalTo( 0 ) ); } String resource( String fileName ) @@ -478,4 +552,9 @@ String resource( String fileName ) } return resource.getFile(); } + + private BoltServerAddress address( int port ) + { + return new BoltServerAddress( "127.0.0.1", port ); + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java index 26a57f4177..5944dcac80 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusterDriverTest.java @@ -24,14 +24,14 @@ import org.junit.rules.ExpectedException; import java.util.Collections; -import java.util.Iterator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionPool; -import org.neo4j.driver.internal.value.IntegerValue; -import org.neo4j.driver.internal.value.StringValue; +import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.AccessMode; import org.neo4j.driver.v1.Logger; import org.neo4j.driver.v1.Logging; @@ -54,8 +54,10 @@ import static org.hamcrest.core.IsNot.not; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.neo4j.driver.internal.security.SecurityPlan.insecure; +import static org.neo4j.driver.v1.Values.value; public class ClusterDriverTest { @@ -64,7 +66,8 @@ public class ClusterDriverTest private static final BoltServerAddress SEED = new BoltServerAddress( "localhost", 7687 ); private static final String GET_SERVERS = "CALL dbms.cluster.routing.getServers"; - private static final List NO_ADDRESSES = Collections.emptyList(); + private static final List NO_ADDRESSES = Collections.emptyList(); + private final ConnectionPool pool = pool(); @Test public void shouldDoRoutingOnInitialization() @@ -72,16 +75,16 @@ public void shouldDoRoutingOnInitialization() // Given final Session session = mock( Session.class ); when( session.run( GET_SERVERS ) ).thenReturn( - getServers( singletonList( boltAddress( "localhost", 1111 ) ), - singletonList( boltAddress( "localhost", 2222 ) ), - singletonList( boltAddress( "localhost", 3333 ) ) ) ); + getServers( singletonList( "localhost:1111" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ) ) ); // When ClusterDriver clusterDriver = forSession( session ); // Then assertThat( clusterDriver.routingServers(), - containsInAnyOrder( boltAddress( "localhost", 1111 ), SEED ) ); + containsInAnyOrder( boltAddress( "localhost", 1111 )) ); assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 2222 ) ) ); assertThat( clusterDriver.writeServers(), @@ -96,16 +99,16 @@ public void shouldDoReRoutingOnSessionAcquisitionIfNecessary() final Session session = mock( Session.class ); when( session.run( GET_SERVERS ) ) .thenReturn( - getServers( singletonList( boltAddress( "localhost", 1111 ) ), NO_ADDRESSES, NO_ADDRESSES ) ) + getServers( singletonList( "localhost:1111" ), NO_ADDRESSES, NO_ADDRESSES ) ) .thenReturn( - getServers( singletonList( boltAddress( "localhost", 1112 ) ), - singletonList( boltAddress( "localhost", 2222 ) ), - singletonList( boltAddress( "localhost", 3333 ) ) ) ); + getServers( singletonList( "localhost:1112" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ) ) ); ClusterDriver clusterDriver = forSession( session ); assertThat( clusterDriver.routingServers(), - containsInAnyOrder( boltAddress( "localhost", 1111 ), SEED ) ); + containsInAnyOrder( boltAddress( "localhost", 1111 )) ); assertThat( clusterDriver.readServers(), Matchers.empty() ); assertThat( clusterDriver.writeServers(), Matchers.empty() ); @@ -115,7 +118,7 @@ public void shouldDoReRoutingOnSessionAcquisitionIfNecessary() // Then assertThat( clusterDriver.routingServers(), - containsInAnyOrder( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), SEED ) ); + containsInAnyOrder( boltAddress( "localhost", 1112 ) )); assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 2222 ) ) ); assertThat( clusterDriver.writeServers(), @@ -129,12 +132,11 @@ public void shouldNotDoReRoutingOnSessionAcquisitionIfNotNecessary() final Session session = mock( Session.class ); when( session.run( GET_SERVERS ) ) .thenReturn( - getServers( asList( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), - boltAddress( "localhost", 1113 ) ), - singletonList( boltAddress( "localhost", 2222 ) ), - singletonList( boltAddress( "localhost", 3333 ) ) ) ) + getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ) ) ) .thenReturn( - getServers( singletonList( boltAddress( "localhost", 5555 ) ), NO_ADDRESSES, NO_ADDRESSES ) ); + getServers( singletonList( "localhost:5555" ), NO_ADDRESSES, NO_ADDRESSES ) ); ClusterDriver clusterDriver = forSession( session ); @@ -162,9 +164,93 @@ public void shouldFailIfNoRouting() forSession( session ); } + @Test + public void shouldForgetAboutServersOnRerouting() + { + // Given + final Session session = mock( Session.class ); + when( session.run( GET_SERVERS ) ) + .thenReturn( + getServers( singletonList( "localhost:1111" ), NO_ADDRESSES, NO_ADDRESSES ) ) + .thenReturn( + getServers( singletonList( "localhost:1112" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ) ) ); + + ClusterDriver clusterDriver = forSession( session ); + + assertThat( clusterDriver.routingServers(), + containsInAnyOrder( boltAddress( "localhost", 1111 )) ); + + + // When + clusterDriver.session( AccessMode.READ ); + + // Then + assertThat( clusterDriver.routingServers(), + containsInAnyOrder( boltAddress( "localhost", 1112 ) )); + verify( pool ).purge( boltAddress( "localhost", 1111 ) ); + } + + @Test + public void shouldRediscoverOnTimeout() + { + // Given + final Session session = mock( Session.class ); + Clock clock = mock( Clock.class ); + when(clock.millis()).thenReturn( 0L, 11000L, 22000L ); + when( session.run( GET_SERVERS ) ) + .thenReturn( + getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ), 10L/*seconds*/ ) ) + .thenReturn( + getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) ); + + ClusterDriver clusterDriver = forSession( session, clock ); + + // When + clusterDriver.session( AccessMode.WRITE ); + + // Then + assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) ); + assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) ); + assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 5555 ) ) ); + } + + @Test + public void shouldNotRediscoverWheNoTimeout() + { + // Given + final Session session = mock( Session.class ); + Clock clock = mock( Clock.class ); + when(clock.millis()).thenReturn( 0L, 9900L, 18800L ); + when( session.run( GET_SERVERS ) ) + .thenReturn( + getServers( asList( "localhost:1111", "localhost:1112", "localhost:1113" ), + singletonList( "localhost:2222" ), + singletonList( "localhost:3333" ), 10L/*seconds*/ ) ) + .thenReturn( + getServers( singletonList( "localhost:5555" ), singletonList( "localhost:5555" ), singletonList( "localhost:5555" ) ) ); + + ClusterDriver clusterDriver = forSession( session, clock ); + + // When + clusterDriver.session( AccessMode.WRITE ); + + // Then + assertThat( clusterDriver.routingServers(), containsInAnyOrder( boltAddress( "localhost", 1111 ), boltAddress( "localhost", 1112 ), boltAddress( "localhost", 1113 ) ) ); + assertThat( clusterDriver.readServers(), containsInAnyOrder( boltAddress( "localhost", 2222 ) ) ); + assertThat( clusterDriver.writeServers(), containsInAnyOrder( boltAddress( "localhost", 3333 ) ) ); + } + private ClusterDriver forSession( final Session session ) { - return new ClusterDriver( SEED, pool(), insecure(), + return forSession( session, Clock.SYSTEM ); + } + private ClusterDriver forSession( final Session session, Clock clock ) + { + return new ClusterDriver( SEED, pool, insecure(), new BiFunction() { @Override @@ -172,7 +258,7 @@ public Session apply( Connection connection, Logger ignore ) { return session; } - }, logging() ); + }, clock, logging() ); } private BoltServerAddress boltAddress( String host, int port ) @@ -180,59 +266,41 @@ private BoltServerAddress boltAddress( String host, int port ) return new BoltServerAddress( host, port ); } - StatementResult getServers( final List routers, final List readers, - final List writers ) - { + StatementResult getServers( final List routers, final List readers, + final List writers ) + { + return getServers( routers,readers, writers, Long.MAX_VALUE ); + } + StatementResult getServers( final List routers, final List readers, + final List writers, final long ttl ) + { return new StatementResult() { - private final int totalSize = routers.size() + readers.size() + writers.size(); - private final Iterator routeIterator = routers.iterator(); - private final Iterator readIterator = readers.iterator(); - private final Iterator writeIterator = writers.iterator(); private int counter = 0; @Override public List keys() { - return asList( "address", "mode", "expires" ); + return asList( "ttl", "servers" ); } @Override public boolean hasNext() { - return counter++ < totalSize; + return counter++ < 1; } @Override public Record next() { - if ( routeIterator.hasNext() ) - { - return new InternalRecord( asList( "address", "mode", "expires" ), - new Value[]{new StringValue( routeIterator.next().toString() ), - new StringValue( "ROUTE" ), - new IntegerValue( Long.MAX_VALUE )} ); - } - else if ( readIterator.hasNext() ) - { - return new InternalRecord( asList( "address", "mode", "expires" ), - new Value[]{new StringValue( readIterator.next().toString() ), - new StringValue( "READ" ), - new IntegerValue( Long.MAX_VALUE )} ); - } - else if ( writeIterator.hasNext() ) - { - return new InternalRecord( asList( "address", "mode", "expires" ), - new Value[]{new StringValue( writeIterator.next().toString() ), - new StringValue( "WRITE" ), - new IntegerValue( Long.MAX_VALUE )} ); - } - else - { - return Collections.emptyIterator().next(); - } + return new InternalRecord( asList( "ttl", "servers" ), + new Value[]{ + value( ttl ), + value( asList( serverInfo( "ROUTE", routers ), serverInfo( "WRITE", writers ), + serverInfo( "READ", readers ) ) ) + } ); } @Override @@ -268,11 +336,20 @@ public ResultSummary consume() @Override public void remove() { - throw new UnsupportedOperationException( ); + throw new UnsupportedOperationException(); } }; } + private Map serverInfo( String role, List addresses ) + { + Map map = new HashMap<>(); + map.put( "role", role ); + map.put( "addresses", addresses ); + + return map; + } + private ConnectionPool pool() { ConnectionPool pool = mock( ConnectionPool.class ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java index 563b541548..4b33f11987 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusteredNetworkSessionTest.java @@ -88,7 +88,7 @@ public void shouldHandleConnectionFailures() public void shouldHandleWriteFailures() { // Given - doThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ). + doThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ). when( connection ).run( anyString(), any( Map.class ), any( Collector.class ) ); ClusteredNetworkSession session = new ClusteredNetworkSession( connection, onError, mock( Logger.class ) ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java b/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java index 228b722f79..5959796a01 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/ClusteredStatementResultTest.java @@ -68,7 +68,7 @@ public void shouldHandleWriteFailureOnConsume() { // Given when( delegate.consume() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -117,7 +117,7 @@ public void shouldHandleWriteFailureOnHasNext() { // Given when( delegate.hasNext() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -166,7 +166,7 @@ public void shouldHandleWriteFailureOnKeys() { // Given when( delegate.keys() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -215,7 +215,7 @@ public void shouldHandleWriteFailureOnList() { // Given when( delegate.list() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -264,7 +264,7 @@ public void shouldHandleWriteFailureOnNext() { // Given when( delegate.next() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -313,7 +313,7 @@ public void shouldHandleWriteFailureOnPeek() { // Given when( delegate.peek() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); @@ -362,7 +362,7 @@ public void shouldHandleWriteFailureOnSingle() { // Given when( delegate.single() ) - .thenThrow( new ClientException( "Neo.ClientError.General.ForbiddenOnFollower", "oh no!" ) ); + .thenThrow( new ClientException( "Neo.ClientError.Cluster.NotALeader", "oh no!" ) ); ClusteredStatementResult result = new ClusteredStatementResult( delegate, LOCALHOST, onError ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java index a57b0b1c4a..b602d85554 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/ConnectionInvalidationTest.java @@ -26,8 +26,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.Collector; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ConnectionPool; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.Consumers; import org.neo4j.driver.v1.Config; @@ -72,8 +74,11 @@ public void shouldInvalidateConnectionThatIsOld() throws Throwable // When/Then BlockingQueue queue = mock( BlockingQueue.class ); + PooledConnectionValidator validator = + new PooledConnectionValidator( pool( true ), poolSettings ); + PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), poolSettings ); + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator); consumer.accept( conn ); verify( queue, never() ).add( conn ); @@ -90,11 +95,13 @@ public void shouldNotInvalidateConnectionThatIsNotOld() throws Throwable PoolSettings poolSettings = PoolSettings.defaultSettings(); when( clock.millis() ).thenReturn( 0L, poolSettings.idleTimeBeforeConnectionTest() - 1L ); PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); + PooledConnectionValidator validator = + new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then BlockingQueue queue = mock( BlockingQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), poolSettings ); + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ),validator ); consumer.accept( conn ); verify( queue ).offer( conn ); @@ -107,11 +114,12 @@ public void shouldInvalidConnectionIfFailedToReset() throws Throwable Mockito.doThrow( new ClientException( "That didn't work" ) ).when( delegate ).reset(); PoolSettings poolSettings = PoolSettings.defaultSettings(); PooledConnection conn = new PooledConnection( delegate, Consumers.noOp(), clock ); - + PooledConnectionValidator validator = + new PooledConnectionValidator( pool( true ), poolSettings ); // When/Then BlockingQueue queue = mock( BlockingQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), poolSettings ); + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); verify( queue, never() ).add( conn ); @@ -143,25 +151,27 @@ public void shouldInvalidateOnProtocolViolationExceptions() throws Throwable private void assertUnrecoverable( Neo4jException exception ) { doThrow( exception ).when( delegate ) - .run( eq("assert unrecoverable"), anyMap(), any( Collector.class ) ); + .run( eq( "assert unrecoverable" ), anyMap(), any( Collector.class ) ); // When try { - conn.run( "assert unrecoverable", new HashMap( ), Collector.NO_OP ); + conn.run( "assert unrecoverable", new HashMap(), Collector.NO_OP ); fail( "Should've rethrown exception" ); } catch ( Neo4jException e ) { assertThat( e, equalTo( exception ) ); } + PoolSettings poolSettings = PoolSettings.defaultSettings(); + PooledConnectionValidator validator = + new PooledConnectionValidator( pool( true ), poolSettings ); // Then assertTrue( conn.hasUnrecoverableErrors() ); - PoolSettings poolSettings = PoolSettings.defaultSettings(); BlockingQueue queue = mock( BlockingQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), poolSettings ); + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); verify( queue, never() ).offer( conn ); @@ -170,12 +180,12 @@ private void assertUnrecoverable( Neo4jException exception ) @SuppressWarnings( "unchecked" ) private void assertRecoverable( Neo4jException exception ) { - doThrow( exception ).when( delegate ).run( eq("assert recoverable"), anyMap(), any( Collector.class ) ); + doThrow( exception ).when( delegate ).run( eq( "assert recoverable" ), anyMap(), any( Collector.class ) ); // When try { - conn.run( "assert recoverable", new HashMap( ), Collector.NO_OP ); + conn.run( "assert recoverable", new HashMap(), Collector.NO_OP ); fail( "Should've rethrown exception" ); } catch ( Neo4jException e ) @@ -186,11 +196,20 @@ private void assertRecoverable( Neo4jException exception ) // Then assertFalse( conn.hasUnrecoverableErrors() ); PoolSettings poolSettings = PoolSettings.defaultSettings(); + PooledConnectionValidator validator = + new PooledConnectionValidator( pool( true ), poolSettings ); BlockingQueue queue = mock( BlockingQueue.class ); PooledConnectionReleaseConsumer consumer = - new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), poolSettings ); + new PooledConnectionReleaseConsumer( queue, new AtomicBoolean( false ), validator ); consumer.accept( conn ); verify( queue ).offer( conn ); } + + private ConnectionPool pool( boolean hasAddress ) + { + ConnectionPool pool = mock( ConnectionPool.class ); + when( pool.hasAddress( any( BoltServerAddress.class ) ) ).thenReturn( hasAddress ); + return pool; + } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java index 91ed41ad16..be4bdd40b8 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/net/pooling/PooledConnectionTest.java @@ -23,9 +23,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; + import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.v1.exceptions.ClientException; +import org.neo4j.driver.v1.util.Function; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.hasItem; import static org.hamcrest.MatcherAssert.assertThat; @@ -37,6 +40,26 @@ public class PooledConnectionTest { + + private static final Function + VALID_CONNECTION = new Function() + { + @Override + public Boolean apply( PooledConnection pooledConnection ) + { + return true; + } + }; + private static final Function + INVALID_CONNECTION = new Function() + { + @Override + public Boolean apply( PooledConnection pooledConnection ) + { + return false; + } + }; + @Test public void shouldDisposeConnectionIfNotValidConnection() throws Throwable { @@ -47,14 +70,8 @@ public void shouldDisposeConnectionIfNotValidConnection() throws Throwable Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), PoolSettings.defaultSettings() /*Does not matter what config for this test*/ ) - { - @Override - boolean validConnection( PooledConnection conn ) - { - return false; - } - }; + new AtomicBoolean( false ), INVALID_CONNECTION ); + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -83,16 +100,9 @@ public void shouldReturnToThePoolIfIsValidConnectionAndIdlePoolIsNotFull() throw Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), PoolSettings.defaultSettings() /*Does not matter what config for this test*/ ) - { - @Override - boolean validConnection( PooledConnection conn ) - { - return true; - } - }; + new AtomicBoolean( false ), VALID_CONNECTION ); - PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) + PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @Override public void dispose() @@ -120,14 +130,7 @@ public void shouldDisposeConnectionIfValidConnectionAndIdlePoolIsFull() throws T Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( false ), PoolSettings.defaultSettings() /*Does not matter what config for this test*/ ) - { - @Override - boolean validConnection( PooledConnection conn ) - { - return true; - } - }; + new AtomicBoolean( false ), VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ); PooledConnection shouldBeClosedConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) @@ -164,7 +167,7 @@ public void shouldDisposeConnectionIfPoolAlreadyClosed() throws Throwable Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - new AtomicBoolean( true ), PoolSettings.defaultSettings() /*Does not matter what config for this test*/ ); + new AtomicBoolean( true ), VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { @@ -204,7 +207,7 @@ public boolean offer(PooledConnection conn) Connection conn = mock( Connection.class ); PooledConnectionReleaseConsumer releaseConsumer = new PooledConnectionReleaseConsumer( pool, - stopped , PoolSettings.defaultSettings() /*Does not matter what config for this test*/ ); + stopped , VALID_CONNECTION); PooledConnection pooledConnection = new PooledConnection( conn, releaseConsumer, Clock.SYSTEM ) { diff --git a/driver/src/test/resources/acquire_endpoints.script b/driver/src/test/resources/acquire_endpoints.script index c34f2f9b33..293e0097f5 100644 --- a/driver/src/test/resources/acquire_endpoints.script +++ b/driver/src/test/resources/acquire_endpoints.script @@ -5,12 +5,6 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} - RECORD ["127.0.0.1:9007", "WRITE",9223372036854775807] - RECORD ["127.0.0.1:9008", "WRITE",9223372036854775807] - RECORD ["127.0.0.1:9005", "READ",9223372036854775807] - RECORD ["127.0.0.1:9006", "READ",9223372036854775807] - RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] - SUCCESS {} \ No newline at end of file +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9007","127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] + SUCCESS {} \ No newline at end of file diff --git a/driver/src/test/resources/discover_invalid_server.script b/driver/src/test/resources/discover_invalid_server.script deleted file mode 100644 index ad6080e9ba..0000000000 --- a/driver/src/test/resources/discover_invalid_server.script +++ /dev/null @@ -1,12 +0,0 @@ -!: AUTO INIT -!: AUTO RESET -!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} -!: AUTO PULL_ALL - -C: RUN "CALL dbms.cluster.discoverEndpointAcquisitionServers" {} - PULL_ALL -S: SUCCESS {"fields": ["address"]} - RECORD ["127.0.0.1:9001"] - SUCCESS {} -C: RUN "CALL dbms.cluster.discoverEndpointAcquisitionServers" {} - PULL_ALL diff --git a/driver/src/test/resources/discover_new_servers.script b/driver/src/test/resources/discover_new_servers.script index 14cf676417..26fe29ff64 100644 --- a/driver/src/test/resources/discover_new_servers.script +++ b/driver/src/test/resources/discover_new_servers.script @@ -5,11 +5,6 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} - RECORD ["127.0.0.1:9002", "WRITE",9223372036854775807] - RECORD ["127.0.0.1:9005", "READ",9223372036854775807] - RECORD ["127.0.0.1:9003", "READ",9223372036854775807] - RECORD ["127.0.0.1:9004", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9002"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9004","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] SUCCESS {} diff --git a/driver/src/test/resources/discover_servers.script b/driver/src/test/resources/discover_servers.script index 8f98264424..50b3d815d1 100644 --- a/driver/src/test/resources/discover_servers.script +++ b/driver/src/test/resources/discover_servers.script @@ -5,11 +5,6 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} - RECORD ["127.0.0.1:9001", "WRITE",9223372036854775807] - RECORD ["127.0.0.1:9002", "READ",9223372036854775807] - RECORD ["127.0.0.1:9003", "READ",9223372036854775807] - RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9002","127.0.0.1:9003"], "role": "READ"},{"addresses": ["127.0.0.1:9001","127.0.0.1:9002","127.0.0.1:9003"], "role": "ROUTE"}]] SUCCESS {} diff --git a/driver/src/test/resources/empty.script b/driver/src/test/resources/empty.script new file mode 100644 index 0000000000..b6944ed5e2 --- /dev/null +++ b/driver/src/test/resources/empty.script @@ -0,0 +1,4 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL \ No newline at end of file diff --git a/driver/src/test/resources/expire.script b/driver/src/test/resources/expire.script new file mode 100644 index 0000000000..f25380c795 --- /dev/null +++ b/driver/src/test/resources/expire.script @@ -0,0 +1,15 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [0, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9003"], "role": "WRITE"}]] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9006"], "role": "WRITE"}]] + SUCCESS {} diff --git a/driver/src/test/resources/handle_empty_response.script b/driver/src/test/resources/handle_empty_response.script index 8f64f8853e..cd8bf9da34 100644 --- a/driver/src/test/resources/handle_empty_response.script +++ b/driver/src/test/resources/handle_empty_response.script @@ -5,5 +5,5 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} +S: SUCCESS {"fields": ["ttl", "servers"]} SUCCESS {} diff --git a/driver/src/test/resources/not_able_to_write_server.script b/driver/src/test/resources/not_able_to_write_server.script index a3ea552e1b..6a97dd41d9 100644 --- a/driver/src/test/resources/not_able_to_write_server.script +++ b/driver/src/test/resources/not_able_to_write_server.script @@ -5,7 +5,7 @@ C: RUN "CREATE ()" {} C: PULL_ALL -S: FAILURE {"code": "Neo.ClientError.General.ForbiddenOnFollower", "message": "blabla"} +S: FAILURE {"code": "Neo.ClientError.Cluster.NotALeader", "message": "blabla"} S: IGNORED C: ACK_FAILURE S: SUCCESS {} diff --git a/driver/src/test/resources/not_reuse_connection.script b/driver/src/test/resources/not_reuse_connection.script new file mode 100644 index 0000000000..75bac8e654 --- /dev/null +++ b/driver/src/test/resources/not_reuse_connection.script @@ -0,0 +1,15 @@ +!: AUTO INIT +!: AUTO RESET +!: AUTO RUN "RETURN 1 // JavaDriver poll to test connection" {} +!: AUTO PULL_ALL + +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001","127.0.0.1:9003"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9002"], "role": "READ"},{"addresses": ["127.0.0.1:9003"], "role": "WRITE"}]] + SUCCESS {} +C: RUN "CALL dbms.cluster.routing.getServers" {} + PULL_ALL +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"], "role": "ROUTE"},{"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9006"], "role": "WRITE"}]] + SUCCESS {} diff --git a/driver/src/test/resources/rediscover.script b/driver/src/test/resources/rediscover.script index 8ef0f3cabe..af54cbcd92 100644 --- a/driver/src/test/resources/rediscover.script +++ b/driver/src/test/resources/rediscover.script @@ -5,16 +5,11 @@ C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} - RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]] SUCCESS {} C: RUN "CALL dbms.cluster.routing.getServers" {} PULL_ALL -S: SUCCESS {"fields": ["address", "mode", "expires"]} - RECORD ["127.0.0.1:9004", "WRITE",9223372036854775807] - RECORD ["127.0.0.1:9005", "READ",9223372036854775807] - RECORD ["127.0.0.1:9001", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9002", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9003", "ROUTE",9223372036854775807] - RECORD ["127.0.0.1:9004", "ROUTE",9223372036854775807] +S: SUCCESS {"fields": ["ttl", "servers"]} + RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005"], "role": "READ"},{"addresses": ["127.0.0.1:9002","127.0.0.1:9003","127.0.0.1:9004"], "role": "ROUTE"}]] SUCCESS {}