From 9db607612e897062b52528413cd64dcf619245e8 Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Wed, 5 Oct 2016 12:44:31 +0200 Subject: [PATCH 1/3] Remove server when failing on session acquisition We were only checking for connection failures on running and consuming the results, however there might also be a connection failure on session acquisition. By not properly removing the server on session-acquisition failures we end up in a scenario where we constantly retry to connect to that endpoint. --- .../neo4j/driver/internal/RoutingDriver.java | 47 ++++++++++++++----- .../internal/RoutingDriverStubTest.java | 33 +++++++++++++ 2 files changed, 69 insertions(+), 11 deletions(-) diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java index 5256f56fa6..613c76797d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java @@ -66,6 +66,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 ) } }; private static final int MIN_SERVERS = 1; + private static final int CONNECTION_RETRIES = 3; private final ConnectionPool connections; private final BiFunction sessionProvider; private final Clock clock; @@ -110,7 +111,6 @@ private Set forgetAllServers() seen.addAll( routingServers ); seen.addAll( readServers ); seen.addAll( writeServers ); - routingServers.clear(); readServers.clear(); writeServers.clear(); return seen; @@ -138,11 +138,11 @@ private void getServers() { boolean success = false; - ConcurrentRoundRobinSet routers = new ConcurrentRoundRobinSet<>( routingServers ); + final Set newRouters = new HashSet<>( ); final Set seen = forgetAllServers(); - while ( !routers.isEmpty() && !success ) + while ( !routingServers.isEmpty() && !success ) { - address = routers.hop(); + address = routingServers.hop(); success = call( address, GET_SERVERS, new Consumer() { @Override @@ -162,12 +162,19 @@ public void accept( Record record ) writeServers.addAll( server.addresses() ); break; case "ROUTE": - routingServers.addAll( server.addresses() ); + newRouters.addAll( server.addresses() ); break; } } } } ); + //We got trough but server gave us an empty list of routers + if (success && newRouters.isEmpty()) { + success = false; + } else if (success) { + routingServers.clear(); + routingServers.addAll( newRouters ); + } } if ( !success ) { @@ -249,7 +256,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer< recorder.accept( records.next() ); } } - catch ( ConnectionFailureException e ) + catch ( Throwable e ) { forget( address ); return false; @@ -306,18 +313,36 @@ public void onWriteFailure( BoltServerAddress address ) private Connection acquireConnection( AccessMode role ) { - //Potentially rediscover servers if we are not happy with our current knowledge - checkServers(); - + ConcurrentRoundRobinSet servers; switch ( role ) { case READ: - return connections.acquire( readServers.hop() ); + servers = readServers; + break; case WRITE: - return connections.acquire( writeServers.hop() ); + servers = writeServers; + break; default: throw new ClientException( role + " is not supported for creating new sessions" ); } + + //Potentially rediscover servers if we are not happy with our current knowledge + checkServers(); + int numberOfServers = servers.size(); + for ( int i = 0; i < numberOfServers; i++ ) + { + BoltServerAddress address = servers.hop(); + try + { + return connections.acquire( address ); + } + catch ( ConnectionFailureException e ) + { + forget( address ); + } + } + + throw new ConnectionFailureException( "Failed to connect to any servers" ); } @Override diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java index 2c0a69a2fc..5bcbb0c938 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java @@ -42,6 +42,7 @@ import org.neo4j.driver.v1.GraphDatabase; import org.neo4j.driver.v1.Record; import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.exceptions.ConnectionFailureException; import org.neo4j.driver.v1.exceptions.ServiceUnavailableException; import org.neo4j.driver.v1.exceptions.SessionExpiredException; import org.neo4j.driver.v1.util.Function; @@ -49,6 +50,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.core.IsEqual.equalTo; @@ -331,6 +333,37 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce assertThat( server.exitStatus(), equalTo( 0 ) ); } + @Test + public void shouldForgetEndpointsOnFailedSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled + { + // Given + StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 ); + + //no read servers + + URI uri = URI.create( "bolt+routing://127.0.0.1:9001" ); + RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config ); + try + { + driver.session( AccessMode.READ ); + fail(); + } + catch ( ConnectionFailureException e ) + { + //ignore + } + + assertThat( driver.readServers(), empty() ); + assertThat( driver.writeServers(), hasSize( 2 ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) ); + assertFalse( driver.connectionPool().hasAddress( address( 9006 ) ) ); + driver.close(); + + // Finally + assertThat( server.exitStatus(), equalTo( 0 ) ); + } + + @Test public void shouldRediscoverIfNecessaryOnSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled From 492d60c929a9d425ce690d391224bd99a1bd188f Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Wed, 5 Oct 2016 13:17:54 +0200 Subject: [PATCH 2/3] Added getter for address on RoutingNetworkSession --- .../org/neo4j/driver/internal/RoutingNetworkSession.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java index 30db4d2d20..1637cf18bb 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingNetworkSession.java @@ -75,6 +75,11 @@ public void close() } } + public BoltServerAddress address() + { + return connection.address(); + } + static Neo4jException filterFailureToWrite( ClientException e, AccessMode mode, RoutingErrorHandler onError, BoltServerAddress address ) { From b4885f6a6121808c70e92ea99f3bfd73a5aa0450 Mon Sep 17 00:00:00 2001 From: Pontus Melke Date: Wed, 5 Oct 2016 13:18:37 +0200 Subject: [PATCH 3/3] Added getter for address on RoutingStatementResult --- .../org/neo4j/driver/internal/RoutingStatementResult.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java b/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java index 3037cb7696..e459a7238d 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java +++ b/driver/src/main/java/org/neo4j/driver/internal/RoutingStatementResult.java @@ -192,4 +192,8 @@ public ResultSummary consume() } } + public BoltServerAddress address() + { + return address; + } }