Skip to content

Commit 71ff948

Browse files
authored
Merge pull request #588 from zhenlineo/1.7-retry-no-writer
Retry tx on no writer error
2 parents 812db7e + bb038cf commit 71ff948

File tree

3 files changed

+82
-5
lines changed

3 files changed

+82
-5
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.neo4j.driver.v1.exceptions.SessionExpiredException;
4848
import org.neo4j.driver.v1.net.ServerAddressResolver;
4949

50+
import static java.lang.String.format;
5051
import static java.util.concurrent.CompletableFuture.completedFuture;
5152

5253
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler
@@ -221,7 +222,8 @@ private void acquire( AccessMode mode, AddressSet addresses, CompletableFuture<C
221222
{
222223
if ( error instanceof ServiceUnavailableException )
223224
{
224-
log.error( "Failed to obtain a connection towards address " + address, error );
225+
SessionExpiredException errorToLog = new SessionExpiredException( format( "Server at %s is no longer available", address ), error );
226+
log.warn( "Failed to obtain a connection towards address " + address, errorToLog );
225227
forget( address );
226228
eventExecutorGroup.next().execute( () -> acquire( mode, addresses, result ) );
227229
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import org.neo4j.driver.v1.Config;
4040
import org.neo4j.driver.v1.Driver;
4141
import org.neo4j.driver.v1.GraphDatabase;
42+
import org.neo4j.driver.v1.Logger;
43+
import org.neo4j.driver.v1.Logging;
4244
import org.neo4j.driver.v1.Record;
4345
import org.neo4j.driver.v1.Session;
4446
import org.neo4j.driver.v1.StatementResult;
@@ -61,7 +63,9 @@
6163
import static org.junit.jupiter.api.Assertions.assertThrows;
6264
import static org.junit.jupiter.api.Assertions.assertTrue;
6365
import static org.mockito.ArgumentMatchers.any;
66+
import static org.mockito.ArgumentMatchers.startsWith;
6467
import static org.mockito.Mockito.mock;
68+
import static org.mockito.Mockito.times;
6569
import static org.mockito.Mockito.verify;
6670
import static org.mockito.Mockito.when;
6771
import static org.neo4j.driver.v1.Logging.none;
@@ -726,6 +730,40 @@ void shouldRetryWriteTransactionUntilSuccess() throws Exception
726730
}
727731
}
728732

733+
@Test
734+
void shouldRetryWriteTransactionUntilSuccessWithWhenLeaderIsRemoved() throws Exception
735+
{
736+
// This test simulates a router in a cluster when a leader is removed.
737+
// The router first returns a RT with a writer inside.
738+
// However this writer is killed while the driver is running a tx with it.
739+
// Then at the second time the router returns the same RT with the killed writer inside.
740+
// At the third round, the router removes the the writer server from RT reply.
741+
// Finally, the router returns a RT with a reachable writer.
742+
StubServer router = StubServer.start( "acquire_endpoints_v3_leader_killed.script", 9001 );
743+
StubServer brokenWriter = StubServer.start( "dead_write_server.script", 9004 );
744+
StubServer writer = StubServer.start( "write_server.script", 9008 );
745+
746+
Logger logger = mock( Logger.class );
747+
Config config = Config.builder().withoutEncryption().withLogging( mockedLogging( logger ) ).build();
748+
try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9001", config );
749+
Session session = driver.session() )
750+
{
751+
AtomicInteger invocations = new AtomicInteger();
752+
List<Record> records = session.writeTransaction( queryWork( "CREATE (n {name:'Bob'})", invocations ) );
753+
754+
assertEquals( 0, records.size() );
755+
assertEquals( 2, invocations.get() );
756+
}
757+
finally
758+
{
759+
assertEquals( 0, router.exitStatus() );
760+
assertEquals( 0, brokenWriter.exitStatus() );
761+
assertEquals( 0, writer.exitStatus() );
762+
}
763+
verify( logger, times( 3 ) ).warn( startsWith( "Transaction failed and will be retried in" ), any( SessionExpiredException.class ) );
764+
verify( logger ).warn( startsWith( "Failed to obtain a connection towards address 127.0.0.1:9004" ), any( SessionExpiredException.class ) );
765+
}
766+
729767
@Test
730768
void shouldRetryReadTransactionUntilFailure() throws Exception
731769
{
@@ -1159,19 +1197,24 @@ void useSessionAfterDriverIsClosed() throws Exception
11591197
}
11601198
}
11611199

1162-
private static Driver newDriverWithSleeplessClock( String uriString )
1200+
private static Driver newDriverWithSleeplessClock( String uriString, Config config )
11631201
{
11641202
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );
1165-
return newDriver( uriString, driverFactory );
1203+
return newDriver( uriString, driverFactory, config );
1204+
}
1205+
1206+
private static Driver newDriverWithSleeplessClock( String uriString )
1207+
{
1208+
return newDriverWithSleeplessClock( uriString, config );
11661209
}
11671210

11681211
private static Driver newDriverWithFixedRetries( String uriString, int retries )
11691212
{
11701213
DriverFactory driverFactory = new DriverFactoryWithFixedRetryLogic( retries );
1171-
return newDriver( uriString, driverFactory );
1214+
return newDriver( uriString, driverFactory, config );
11721215
}
11731216

1174-
private static Driver newDriver( String uriString, DriverFactory driverFactory )
1217+
private static Driver newDriver( String uriString, DriverFactory driverFactory, Config config )
11751218
{
11761219
URI uri = URI.create( uriString );
11771220
RoutingSettings routingConf = new RoutingSettings( 1, 1, null );
@@ -1201,4 +1244,11 @@ private static List<String> readStrings( final String query, Session session )
12011244
return names;
12021245
} );
12031246
}
1247+
1248+
private static Logging mockedLogging( Logger logger )
1249+
{
1250+
Logging logging = mock( Logging.class );
1251+
when( logging.getLog( any() ) ).thenReturn( logger );
1252+
return logging;
1253+
}
12041254
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
!: BOLT 3
2+
!: AUTO RESET
3+
4+
C: HELLO {"scheme": "none", "user_agent": "neo4j-java/dev"}
5+
S: SUCCESS {"server": "Neo4j/9.9.9", "connection_id": "bolt-123456789"}
6+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
7+
PULL_ALL
8+
S: SUCCESS {"fields": ["ttl", "servers"]}
9+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
10+
SUCCESS {}
11+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
12+
PULL_ALL
13+
S: SUCCESS {"fields": ["ttl", "servers"]}
14+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9004"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
15+
SUCCESS {}
16+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
17+
PULL_ALL
18+
S: SUCCESS {"fields": ["ttl", "servers"]}
19+
RECORD [9223372036854775807, [{"addresses": [],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
20+
SUCCESS {}
21+
C: RUN "CALL dbms.cluster.routing.getRoutingTable({context})" {"context": {}} {}
22+
PULL_ALL
23+
S: SUCCESS {"fields": ["ttl", "servers"]}
24+
RECORD [9223372036854775807, [{"addresses": ["127.0.0.1:9008"],"role": "WRITE"}, {"addresses": ["127.0.0.1:9005","127.0.0.1:9006"], "role": "READ"},{"addresses": ["127.0.0.1:9001"], "role": "ROUTE"}]]
25+
SUCCESS {}

0 commit comments

Comments
 (0)