Skip to content

Commit f0045ee

Browse files
committed
Remove server when failing on session acquisition
We were only checking for connection failures on running and consuming the results, however there might also be a connection failure on session acquisition. By not properly removing the server on session-acquisition failures we end up in a scenario where we constantly retry to connect to that endpoint.
1 parent 68ff674 commit f0045ee

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

driver/src/main/java/org/neo4j/driver/internal/RoutingDriver.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public int compare( BoltServerAddress o1, BoltServerAddress o2 )
6666
}
6767
};
6868
private static final int MIN_SERVERS = 1;
69+
private static final int CONNECTION_RETRIES = 3;
6970
private final ConnectionPool connections;
7071
private final BiFunction<Connection,Logger,Session> sessionProvider;
7172
private final Clock clock;
@@ -138,11 +139,11 @@ private void getServers()
138139
{
139140
boolean success = false;
140141

141-
ConcurrentRoundRobinSet<BoltServerAddress> routers = new ConcurrentRoundRobinSet<>( routingServers );
142+
final ConcurrentRoundRobinSet<BoltServerAddress> newRouters = new ConcurrentRoundRobinSet<>( );
142143
final Set<BoltServerAddress> seen = forgetAllServers();
143-
while ( !routers.isEmpty() && !success )
144+
while ( !routingServers.isEmpty() && !success )
144145
{
145-
address = routers.hop();
146+
address = routingServers.hop();
146147
success = call( address, GET_SERVERS, new Consumer<Record>()
147148
{
148149
@Override
@@ -162,12 +163,19 @@ public void accept( Record record )
162163
writeServers.addAll( server.addresses() );
163164
break;
164165
case "ROUTE":
165-
routingServers.addAll( server.addresses() );
166+
newRouters.addAll( server.addresses() );
166167
break;
167168
}
168169
}
169170
}
170171
} );
172+
//We got trough but server gave us an empty list of routers
173+
if (success && newRouters.isEmpty()) {
174+
success = false;
175+
} else if (success) {
176+
routingServers.clear();
177+
routingServers.addAll( newRouters );
178+
}
171179
}
172180
if ( !success )
173181
{
@@ -249,7 +257,7 @@ private boolean call( BoltServerAddress address, String procedureName, Consumer<
249257
recorder.accept( records.next() );
250258
}
251259
}
252-
catch ( ConnectionFailureException e )
260+
catch ( Throwable e )
253261
{
254262
forget( address );
255263
return false;
@@ -306,18 +314,36 @@ public void onWriteFailure( BoltServerAddress address )
306314

307315
private Connection acquireConnection( AccessMode role )
308316
{
309-
//Potentially rediscover servers if we are not happy with our current knowledge
310-
checkServers();
311-
317+
ConcurrentRoundRobinSet<BoltServerAddress> servers;
312318
switch ( role )
313319
{
314320
case READ:
315-
return connections.acquire( readServers.hop() );
321+
servers = readServers;
322+
break;
316323
case WRITE:
317-
return connections.acquire( writeServers.hop() );
324+
servers = writeServers;
325+
break;
318326
default:
319327
throw new ClientException( role + " is not supported for creating new sessions" );
320328
}
329+
330+
//Potentially rediscover servers if we are not happy with our current knowledge
331+
checkServers();
332+
int numberOfServers = servers.size();
333+
for ( int i = 0; i < numberOfServers; i++ )
334+
{
335+
BoltServerAddress address = servers.hop();
336+
try
337+
{
338+
return connections.acquire( address );
339+
}
340+
catch ( ConnectionFailureException e )
341+
{
342+
forget( address );
343+
}
344+
}
345+
346+
throw new ConnectionFailureException( "Failed to connect to any servers" );
321347
}
322348

323349
@Override

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverStubTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,15 @@
4242
import org.neo4j.driver.v1.GraphDatabase;
4343
import org.neo4j.driver.v1.Record;
4444
import org.neo4j.driver.v1.Session;
45+
import org.neo4j.driver.v1.exceptions.ConnectionFailureException;
4546
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
4647
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
4748
import org.neo4j.driver.v1.util.Function;
4849
import org.neo4j.driver.v1.util.StubServer;
4950

5051
import static org.hamcrest.Matchers.contains;
5152
import static org.hamcrest.Matchers.containsInAnyOrder;
53+
import static org.hamcrest.Matchers.empty;
5254
import static org.hamcrest.Matchers.hasItem;
5355
import static org.hamcrest.Matchers.hasSize;
5456
import static org.hamcrest.core.IsEqual.equalTo;
@@ -331,6 +333,37 @@ public void shouldForgetEndpointsOnFailure() throws IOException, InterruptedExce
331333
assertThat( server.exitStatus(), equalTo( 0 ) );
332334
}
333335

336+
@Test
337+
public void shouldForgetEndpointsOnFailedSessionAcquisition() throws IOException, InterruptedException, StubServer.ForceKilled
338+
{
339+
// Given
340+
StubServer server = StubServer.start( resource( "acquire_endpoints.script" ), 9001 );
341+
342+
//no read servers
343+
344+
URI uri = URI.create( "bolt+routing://127.0.0.1:9001" );
345+
RoutingDriver driver = (RoutingDriver) GraphDatabase.driver( uri, config );
346+
try
347+
{
348+
driver.session( AccessMode.READ );
349+
fail();
350+
}
351+
catch ( ConnectionFailureException e )
352+
{
353+
//ignore
354+
}
355+
356+
assertThat( driver.readServers(), empty() );
357+
assertThat( driver.writeServers(), hasSize( 2 ) );
358+
assertFalse( driver.connectionPool().hasAddress( address( 9005 ) ) );
359+
assertFalse( driver.connectionPool().hasAddress( address( 9006 ) ) );
360+
driver.close();
361+
362+
// Finally
363+
assertThat( server.exitStatus(), equalTo( 0 ) );
364+
}
365+
366+
334367
@Test
335368
public void shouldRediscoverIfNecessaryOnSessionAcquisition()
336369
throws IOException, InterruptedException, StubServer.ForceKilled

0 commit comments

Comments
 (0)