Skip to content

Commit bb038cf

Browse files
author
Zhen Li
committed
Should retry on connection error when acquiring connection in routing table.
1 parent 0812fab commit bb038cf

File tree

3 files changed

+37
-9
lines changed

3 files changed

+37
-9
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
4848
import org.neo4j.driver.v1.net.ServerAddressResolver;
4949

50+
import static java.lang.String.format;
5051
import static java.util.concurrent.CompletableFuture.completedFuture;
5152

5253
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler
@@ -221,7 +222,8 @@ private void acquire( AccessMode mode, AddressSet addresses, CompletableFuture<C
221222
{
222223
if ( error instanceof ServiceUnavailableException )
223224
{
224-
log.error( "Failed to obtain a connection towards address " + address, error );
225+
SessionExpiredException errorToLog = new SessionExpiredException( format( "Server at %s is no longer available", address ), error );
226+
log.warn( "Failed to obtain a connection towards address " + address, errorToLog );
225227
forget( address );
226228
eventExecutorGroup.next().execute( () -> acquire( mode, addresses, result ) );
227229
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.neo4j.driver.v1.Config;
4040
import org.neo4j.driver.v1.Driver;
4141
import org.neo4j.driver.v1.GraphDatabase;
42+
import org.neo4j.driver.v1.Logger;
43+
import org.neo4j.driver.v1.Logging;
4244
import org.neo4j.driver.v1.Record;
4345
import org.neo4j.driver.v1.Session;
4446
import org.neo4j.driver.v1.StatementResult;
@@ -61,7 +63,9 @@
6163
import static org.junit.jupiter.api.Assertions.assertThrows;
6264
import static org.junit.jupiter.api.Assertions.assertTrue;
6365
import static org.mockito.ArgumentMatchers.any;
66+
import static org.mockito.ArgumentMatchers.startsWith;
6467
import static org.mockito.Mockito.mock;
68+
import static org.mockito.Mockito.times;
6569
import static org.mockito.Mockito.verify;
6670
import static org.mockito.Mockito.when;
6771
import static org.neo4j.driver.v1.Logging.none;
@@ -729,16 +733,19 @@ void shouldRetryWriteTransactionUntilSuccess() throws Exception
729733
@Test
730734
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exception
731735
{
732-
// This test simulates a router in a cluster that a leader is removed.
736+
// This test simulates a router in a cluster when a leader is removed.
733737
// The router first returns a RT with a writer inside.
734738
// However this writer is killed while the driver is running a tx with it.
735-
// As a result, the writer server is removed from RT in the router's second reply.
736-
// Finally, the router will return a RT with a reachable writer.
739+
// Then at the second time the router returns the same RT with the killed writer inside.
740+
// At the third round, the router removes the the writer server from RT reply.
741+
// Finally, the router returns a RT with a reachable writer.
737742
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
738743
StubServer brokenWriter = StubServer.start( "dead_write_server.script", 9004 );
739744
StubServer writer = StubServer.start( "write_server.script", 9008 );
740745

741-
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001" );
746+
Logger logger = mock( Logger.class );
747+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
748+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
742749
Session session = driver.session() )
743750
{
744751
AtomicInteger invocations = new AtomicInteger();
@@ -753,6 +760,8 @@ void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exc
753760
assertEquals( 0, brokenWriter.exitStatus() );
754761
assertEquals( 0, writer.exitStatus() );
755762
}
763+
verify( logger, times( 3 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
764+
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
756765
}
757766

758767
@Test
@@ -1188,19 +1197,24 @@ void useSessionAfterDriverIsClosed() throws Exception
11881197
}
11891198
}
11901199

1191-
private static Driver newDriverWithSleeplessClock( String uriString )
1200+
private static Driver newDriverWithSleeplessClock( String uriString, Config config )
11921201
{
11931202
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );
1194-
return newDriver( uriString, driverFactory );
1203+
return newDriver( uriString, driverFactory, config );
1204+
}
1205+
1206+
private static Driver newDriverWithSleeplessClock( String uriString )
1207+
{
1208+
return newDriverWithSleeplessClock( uriString, config );
11951209
}
11961210

11971211
private static Driver newDriverWithFixedRetries( String uriString, int retries )
11981212
{
11991213
DriverFactory driverFactory = new DriverFactoryWithFixedRetryLogic( retries );
1200-
return newDriver( uriString, driverFactory );
1214+
return newDriver( uriString, driverFactory, config );
12011215
}
12021216

1203-
private static Driver newDriver( String uriString, DriverFactory driverFactory )
1217+
private static Driver newDriver( String uriString, DriverFactory driverFactory, Config config )
12041218
{
12051219
URI uri = URI.create( uriString );
12061220
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
@@ -1230,4 +1244,11 @@ private static List<String> readStrings( final String query, Session session )
12301244
return names;
12311245
} );
12321246
}
1247+
1248+
private static Logging mockedLogging( Logger logger )
1249+
{
1250+
Logging logging = mock( Logging.class );
1251+
when( logging.getLog( any() ) ).thenReturn( logger );
1252+
return logging;
1253+
}
12331254
}

driver/src/test/resources/acquire_endpoints_v3_leader_killed.script

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ S: SUCCESS {"fields": ["ttl", "servers"]}
1010
SUCCESS {}
1111
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
1212
PULL_ALL
13+
S: SUCCESS {"fields": ["ttl", "servers"]}
14+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
15+
SUCCESS {}
16+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
17+
PULL_ALL
1318
S: SUCCESS {"fields": ["ttl", "servers"]}
1419
RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
1520
SUCCESS {}

0 commit comments

Comments
 (0)