Skip to content

Commit 525d0b2

Browse files
committed
Remember initial routing address
Rediscovery procedure will try to contact initial router when all other routers have failed or current routing table does not have any routers.
1 parent 954946d commit 525d0b2

File tree

9 files changed

+219
-41
lines changed

9 files changed

+219
-41
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
143143
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
144144
RoutingSettings routingSettings )
145145
{
146-
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
146+
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
147147
}
148148

149149
/**

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,25 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
4141
private final Rediscovery rediscovery;
4242
private final Logger log;
4343

44-
public LoadBalancer( RoutingSettings settings, ConnectionPool connections, Clock clock, Logging logging,
45-
BoltServerAddress... routingAddresses )
44+
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
45+
Clock clock, Logging logging )
4646
{
47-
this( settings, new ClusterRoutingTable( clock, routingAddresses ), connections, clock,
47+
this( initialRouter, settings, connections, new ClusterRoutingTable( clock, initialRouter ), clock,
4848
logging.getLog( LOAD_BALANCER_LOG_NAME ) );
4949
}
5050

51-
private LoadBalancer( RoutingSettings settings, RoutingTable routingTable, ConnectionPool connections,
52-
Clock clock, Logger log )
51+
private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
52+
RoutingTable routingTable, Clock clock, Logger log )
5353
{
54-
this( routingTable, connections, new Rediscovery( settings, clock, log,
55-
new GetServersProcedureClusterCompositionProvider( clock, log ) ), log );
54+
this( connections, routingTable, createRediscovery( initialRouter, settings, clock, log ), log );
5655
}
5756

58-
LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
57+
LoadBalancer( ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery, Logger log )
5958
{
60-
this.log = log;
6159
this.connections = connections;
6260
this.routingTable = routingTable;
6361
this.rediscovery = rediscovery;
62+
this.log = log;
6463

6564
// initialize the routing table
6665
ensureRouting();
@@ -153,4 +152,11 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
153152
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
154153
}
155154
}
155+
156+
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
157+
Clock clock, Logger log )
158+
{
159+
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
160+
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
161+
}
156162
}

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,16 @@ public class Rediscovery
3232
{
3333
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available.";
3434

35+
private final BoltServerAddress initialRouter;
3536
private final RoutingSettings settings;
3637
private final Clock clock;
3738
private final Logger logger;
3839
private final ClusterCompositionProvider provider;
3940

40-
public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterCompositionProvider provider )
41+
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
42+
ClusterCompositionProvider provider )
4143
{
44+
this.initialRouter = initialRouter;
4245
this.settings = settings;
4346
this.clock = clock;
4447
this.logger = logger;
@@ -49,7 +52,6 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste
4952
// cluster composition, which would be used to update the routing table and connection pool
5053
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
5154
{
52-
assertHasRouters( routingTable );
5355
int failures = 0;
5456

5557
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
@@ -74,13 +76,19 @@ public ClusterComposition lookupClusterComposition( ConnectionPool connections,
7476
private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
7577
RoutingTable routingTable )
7678
{
79+
boolean triedInitialRouter = false;
7780
int size = routingTable.routerSize();
7881
for ( int i = 0; i < size; i++ )
7982
{
8083
BoltServerAddress address = routingTable.nextRouter();
8184
if ( address == null )
8285
{
83-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
86+
break;
87+
}
88+
89+
if ( address.equals( initialRouter ) )
90+
{
91+
triedInitialRouter = true;
8492
}
8593

8694
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
@@ -89,7 +97,12 @@ private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPoo
8997
return composition;
9098
}
9199
}
92-
return null;
100+
101+
if ( triedInitialRouter )
102+
{
103+
return null;
104+
}
105+
return lookupClusterCompositionOnRouter( initialRouter, connections, routingTable );
93106
}
94107

95108
private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
@@ -138,12 +151,4 @@ private void sleep( long millis )
138151
}
139152
}
140153
}
141-
142-
private void assertHasRouters( RoutingTable table )
143-
{
144-
if ( table.routerSize() == 0 )
145-
{
146-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
147-
}
148-
}
149154
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,30 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
833833
}
834834
}
835835

836+
@Test
837+
public void shouldUseInitialRouterForRediscoveryWhenAllOtherRoutersAreDead() throws Exception
838+
{
839+
// initial router does not have itself in the returned set of routers
840+
StubServer router = StubServer.start( "acquire_endpoints.script", 9010 );
841+
842+
try ( Driver driver = GraphDatabase.driver( "bolt+routing://127.0.0.1:9010", config ) )
843+
{
844+
try ( Session session = driver.session( AccessMode.READ ) )
845+
{
846+
// restart router on the same port with different script that contains itself as reader
847+
assertEquals( 0, router.exitStatus() );
848+
router = StubServer.start( "rediscover_using_initial_router.script", 9010 );
849+
850+
List<Record> records = session.run( "MATCH (n) RETURN n.name AS name" ).list();
851+
assertEquals( 2, records.size() );
852+
assertEquals( "Bob", records.get( 0 ).get( "name" ).asString() );
853+
assertEquals( "Alice", records.get( 1 ).get( "name" ).asString() );
854+
}
855+
}
856+
857+
assertEquals( 0, router.exitStatus() );
858+
}
859+
836860
private static Driver newDriverWithSleeplessClock( String uriString )
837861
{
838862
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
345345
private Driver driverWithPool( ConnectionPool pool )
346346
{
347347
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
348-
ConnectionProvider connectionProvider = new LoadBalancer( settings, pool, clock, logging, SEED );
348+
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
349349
Config config = Config.build().withLogging( logging ).toConfig();
350350
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
351351
return new InternalDriver( insecure(), sessionFactory, logging );

driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
7373
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );
7474

7575
// when
76-
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );
76+
LoadBalancer balancer = new LoadBalancer( conns, routingTable, rediscovery, DEV_NULL_LOGGER );
7777

7878
// then
7979
assertNotNull( balancer );
@@ -88,7 +88,7 @@ public void shouldEnsureRoutingOnInitialization() throws Exception
8888
{
8989
// given & when
9090
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
91-
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
91+
LoadBalancer balancer = new LoadBalancer( mock( ConnectionPool.class ), mock( RoutingTable.class ),
9292
mock( Rediscovery.class ), DEV_NULL_LOGGER )
9393
{
9494
@Override
@@ -143,7 +143,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
143143
RoutingTable routingTable = mock( RoutingTable.class );
144144
ConnectionPool connectionPool = mock( ConnectionPool.class );
145145
Rediscovery rediscovery = mock( Rediscovery.class );
146-
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
146+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
147147
BoltServerAddress address = new BoltServerAddress( "host", 42 );
148148

149149
PooledConnection connection = newConnectionWithFailingSync( address );
@@ -174,7 +174,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
174174
PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address );
175175
when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync );
176176
Rediscovery rediscovery = mock( Rediscovery.class );
177-
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
177+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
178178

179179
Session session = newSession( loadBalancer );
180180
// begin transaction to make session obtain a connection
@@ -205,7 +205,7 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne
205205
when( routingTable.readers() ).thenReturn( readerAddrs );
206206
when( routingTable.writers() ).thenReturn( writerAddrs );
207207

208-
return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
208+
return new LoadBalancer( connPool, routingTable, mock( Rediscovery.class ), DEV_NULL_LOGGER );
209209
}
210210

211211
private static Session newSession( LoadBalancer loadBalancer )

0 commit comments

Comments
 (0)