Skip to content

Commit 2badccc

Browse files
committed
Remove server when failing on session acquisition
We were only checking for connection failures on running and consuming the results, however there might also be a connection failure on session acquisition. By not properly removing the server on session-acquisition failures we end up in a scenario where we constantly retry to connect to that endpoint.
1 parent 68ff674 commit 2badccc

File tree

2 files changed

+58
-6
lines changed

2 files changed

+58
-6
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
6666
}
6767
};
6868
private static final int MIN_SERVERS = 1;
69+
private static final int CONNECTION_RETRIES = 3;
6970
private final ConnectionPool connections;
7071
private final BiFunction<Connection,Logger,Session> sessionProvider;
7172
private final Clock clock;
@@ -249,7 +250,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
249250
recorder.accept( records.next() );
250251
}
251252
}
252-
catch ( ConnectionFailureException e )
253+
catch ( Throwable e )
253254
{
254255
forget( address );
255256
return false;
@@ -306,18 +307,36 @@ public void onWriteFailure( BoltServerAddress address )
306307

307308
private Connection acquireConnection( AccessMode role )
308309
{
309-
//Potentially rediscover servers if we are not happy with our current knowledge
310-
checkServers();
311-
310+
ConcurrentRoundRobinSet<BoltServerAddress> servers;
312311
switch ( role )
313312
{
314313
case READ:
315-
return connections.acquire( readServers.hop() );
314+
servers = readServers;
315+
break;
316316
case WRITE:
317-
return connections.acquire( writeServers.hop() );
317+
servers = writeServers;
318+
break;
318319
default:
319320
throw new ClientException( role + " is not supported for creating new sessions" );
320321
}
322+
323+
//Potentially rediscover servers if we are not happy with our current knowledge
324+
checkServers();
325+
int numberOfServers = servers.size();
326+
for ( int i = 0; i < numberOfServers; i++ )
327+
{
328+
BoltServerAddress address = servers.hop();
329+
try
330+
{
331+
return connections.acquire( address );
332+
}
333+
catch ( ConnectionFailureException e )
334+
{
335+
forget( address );
336+
}
337+
}
338+
339+
throw new ConnectionFailureException( "Failed to connect to any servers" );
321340
}
322341

323342
@Override

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@
4242
import org.neo4j.driver.v1.GraphDatabase;
4343
import org.neo4j.driver.v1.Record;
4444
import org.neo4j.driver.v1.Session;
45+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
4546
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4647
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
4748
import org.neo4j.driver.v1.util.Function;
4849
import org.neo4j.driver.v1.util.StubServer;
4950

5051
import static org.hamcrest.Matchers.contains;
5152
import static org.hamcrest.Matchers.containsInAnyOrder;
53+
import static org.hamcrest.Matchers.empty;
5254
import static org.hamcrest.Matchers.hasItem;
5355
import static org.hamcrest.Matchers.hasSize;
5456
import static org.hamcrest.core.IsEqual.equalTo;
@@ -331,6 +333,37 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce
331333
assertThat( server.exitStatus(), equalTo( 0 ) );
332334
}
333335

336+
@Test
337+
public void shouldForgetEndpointsOnFailedSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled
338+
{
339+
// Given
340+
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
341+
342+
//no read servers
343+
344+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
345+
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
346+
try
347+
{
348+
driver.session( AccessMode.READ );
349+
fail();
350+
}
351+
catch ( ConnectionFailureException e )
352+
{
353+
//ignore
354+
}
355+
356+
assertThat( driver.readServers(), empty() );
357+
assertThat( driver.writeServers(), hasSize( 2 ) );
358+
assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) );
359+
assertFalse( driver.connectionPool().hasAddress( address( 9006 ) ) );
360+
driver.close();
361+
362+
// Finally
363+
assertThat( server.exitStatus(), equalTo( 0 ) );
364+
}
365+
366+
334367
@Test
335368
public void shouldRediscoverIfNecessaryOnSessionAcquisition()
336369
throws IOException, InterruptedException, StubServer.ForceKilled

0 commit comments

Comments
 (0)