Skip to content

Commit 6dc7fba

Browse files
author
Zhen Li
authored
Merge pull request #326 from lutovich/1.2-seed-uri-fallback
Remember initial routing address
2 parents c47aa01 + 525d0b2 commit 6dc7fba

File tree

9 files changed

+298
-91
lines changed

9 files changed

+298
-91
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
143143
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
144144
RoutingSettings routingSettings )
145145
{
146-
return new LoadBalancer( routingSettings, connectionPool, createClock(), config.logging(), address );
146+
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
147147
}
148148

149149
/**

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,25 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au
4141
private final Rediscovery rediscovery;
4242
private final Logger log;
4343

44-
public LoadBalancer( RoutingSettings settings, ConnectionPool connections, Clock clock, Logging logging,
45-
BoltServerAddress... routingAddresses )
44+
public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
45+
Clock clock, Logging logging )
4646
{
47-
this( settings, new ClusterRoutingTable( clock, routingAddresses ), connections, clock,
47+
this( initialRouter, settings, connections, new ClusterRoutingTable( clock, initialRouter ), clock,
4848
logging.getLog( LOAD_BALANCER_LOG_NAME ) );
4949
}
5050

51-
private LoadBalancer( RoutingSettings settings, RoutingTable routingTable, ConnectionPool connections,
52-
Clock clock, Logger log )
51+
private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections,
52+
RoutingTable routingTable, Clock clock, Logger log )
5353
{
54-
this( routingTable, connections, new Rediscovery( settings, clock, log,
55-
new GetServersProcedureClusterCompositionProvider( clock, log ) ), log );
54+
this( connections, routingTable, createRediscovery( initialRouter, settings, clock, log ), log );
5655
}
5756

58-
LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
57+
LoadBalancer( ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery, Logger log )
5958
{
60-
this.log = log;
6159
this.connections = connections;
6260
this.routingTable = routingTable;
6361
this.rediscovery = rediscovery;
62+
this.log = log;
6463

6564
// initialize the routing table
6665
ensureRouting();
@@ -127,23 +126,17 @@ synchronized void ensureRouting() throws ServiceUnavailableException, ProtocolEx
127126
if ( routingTable.isStale() )
128127
{
129128
log.info( "Routing information is stale. %s", routingTable );
130-
try
131-
{
132-
// get a new routing table
133-
ClusterComposition cluster = rediscovery.lookupRoutingTable( connections, routingTable );
134-
Set<BoltServerAddress> removed = routingTable.update( cluster );
135-
// purge connections to removed addresses
136-
for ( BoltServerAddress address : removed )
137-
{
138-
connections.purge( address );
139-
}
140129

141-
log.info( "Refreshed routing information. %s", routingTable );
142-
}
143-
catch ( InterruptedException e )
130+
// get a new routing table
131+
ClusterComposition cluster = rediscovery.lookupClusterComposition( connections, routingTable );
132+
Set<BoltServerAddress> removed = routingTable.update( cluster );
133+
// purge connections to removed addresses
134+
for ( BoltServerAddress address : removed )
144135
{
145-
throw new ServiceUnavailableException( "Thread was interrupted while establishing connection.", e );
136+
connections.purge( address );
146137
}
138+
139+
log.info( "Refreshed routing information. %s", routingTable );
147140
}
148141
}
149142

@@ -159,4 +152,11 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode )
159152
throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" );
160153
}
161154
}
155+
156+
private static Rediscovery createRediscovery( BoltServerAddress initialRouter, RoutingSettings settings,
157+
Clock clock, Logger log )
158+
{
159+
ClusterCompositionProvider clusterComposition = new GetServersProcedureClusterCompositionProvider( clock, log );
160+
return new Rediscovery( initialRouter, settings, clock, log, clusterComposition );
161+
}
162162
}

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 85 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,25 @@
2323
import org.neo4j.driver.internal.spi.ConnectionPool;
2424
import org.neo4j.driver.internal.util.Clock;
2525
import org.neo4j.driver.v1.Logger;
26-
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2726
import org.neo4j.driver.v1.exceptions.SecurityException;
27+
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
2828

2929
import static java.lang.String.format;
3030

3131
public class Rediscovery
3232
{
3333
private static final String NO_ROUTERS_AVAILABLE = "Could not perform discovery. No routing servers available.";
3434

35+
private final BoltServerAddress initialRouter;
3536
private final RoutingSettings settings;
3637
private final Clock clock;
3738
private final Logger logger;
3839
private final ClusterCompositionProvider provider;
3940

40-
public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, ClusterCompositionProvider provider )
41+
public Rediscovery( BoltServerAddress initialRouter, RoutingSettings settings, Clock clock, Logger logger,
42+
ClusterCompositionProvider provider )
4143
{
44+
this.initialRouter = initialRouter;
4245
this.settings = settings;
4346
this.clock = clock;
4447
this.logger = logger;
@@ -47,68 +50,105 @@ public Rediscovery( RoutingSettings settings, Clock clock, Logger logger, Cluste
4750

4851
// Given the current routing table and connection pool, use the connection composition provider to fetch a new
4952
// cluster composition, which would be used to update the routing table and connection pool
50-
public ClusterComposition lookupRoutingTable( ConnectionPool connections, RoutingTable routingTable )
51-
throws InterruptedException
53+
public ClusterComposition lookupClusterComposition( ConnectionPool connections, RoutingTable routingTable )
5254
{
53-
assertHasRouters( routingTable );
5455
int failures = 0;
5556

5657
for ( long start = clock.millis(), delay = 0; ; delay = Math.max( settings.retryTimeoutDelay, delay * 2 ) )
5758
{
5859
long waitTime = start + delay - clock.millis();
59-
if ( waitTime > 0 )
60-
{
61-
clock.sleep( waitTime );
62-
}
60+
sleep( waitTime );
6361
start = clock.millis();
6462

65-
int size = routingTable.routerSize();
66-
for ( int i = 0; i < size; i++ )
63+
ClusterComposition composition = lookupClusterCompositionOnKnownRouters( connections, routingTable );
64+
if ( composition != null )
6765
{
68-
BoltServerAddress address = routingTable.nextRouter();
69-
if ( address == null )
70-
{
71-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
72-
}
73-
74-
ClusterCompositionResponse response = null;
75-
try ( Connection connection = connections.acquire( address ) )
76-
{
77-
response = provider.getClusterComposition( connection );
78-
}
79-
catch( SecurityException e )
80-
{
81-
throw e; // terminate the discovery immediately
82-
}
83-
catch ( Exception e )
84-
{
85-
// the connection breaks
86-
logger.error( format( "Failed to connect to routing server '%s'.", address ), e );
87-
routingTable.removeRouter( address );
88-
89-
assertHasRouters( routingTable );
90-
continue;
91-
}
92-
93-
ClusterComposition cluster = response.clusterComposition();
94-
logger.info( "Got cluster composition %s", cluster );
95-
if ( cluster.hasWriters() )
96-
{
97-
return cluster;
98-
}
66+
return composition;
9967
}
68+
10069
if ( ++failures >= settings.maxRoutingFailures )
10170
{
10271
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
10372
}
10473
}
10574
}
10675

107-
private void assertHasRouters( RoutingTable table )
76+
private ClusterComposition lookupClusterCompositionOnKnownRouters( ConnectionPool connections,
77+
RoutingTable routingTable )
10878
{
109-
if ( table.routerSize() == 0 )
79+
boolean triedInitialRouter = false;
80+
int size = routingTable.routerSize();
81+
for ( int i = 0; i < size; i++ )
11082
{
111-
throw new ServiceUnavailableException( NO_ROUTERS_AVAILABLE );
83+
BoltServerAddress address = routingTable.nextRouter();
84+
if ( address == null )
85+
{
86+
break;
87+
}
88+
89+
if ( address.equals( initialRouter ) )
90+
{
91+
triedInitialRouter = true;
92+
}
93+
94+
ClusterComposition composition = lookupClusterCompositionOnRouter( address, connections, routingTable );
95+
if ( composition != null )
96+
{
97+
return composition;
98+
}
99+
}
100+
101+
if ( triedInitialRouter )
102+
{
103+
return null;
104+
}
105+
return lookupClusterCompositionOnRouter( initialRouter, connections, routingTable );
106+
}
107+
108+
private ClusterComposition lookupClusterCompositionOnRouter( BoltServerAddress routerAddress,
109+
ConnectionPool connections, RoutingTable routingTable )
110+
{
111+
ClusterCompositionResponse response;
112+
try ( Connection connection = connections.acquire( routerAddress ) )
113+
{
114+
response = provider.getClusterComposition( connection );
115+
}
116+
catch ( SecurityException e )
117+
{
118+
// auth error happened, terminate the discovery procedure immediately
119+
throw e;
120+
}
121+
catch ( Throwable t )
122+
{
123+
// connection turned out to be broken
124+
logger.error( format( "Failed to connect to routing server '%s'.", routerAddress ), t );
125+
routingTable.removeRouter( routerAddress );
126+
return null;
127+
}
128+
129+
ClusterComposition cluster = response.clusterComposition();
130+
logger.info( "Got cluster composition %s", cluster );
131+
if ( cluster.hasWriters() )
132+
{
133+
return cluster;
134+
}
135+
return null;
136+
}
137+
138+
private void sleep( long millis )
139+
{
140+
if ( millis > 0 )
141+
{
142+
try
143+
{
144+
clock.sleep( millis );
145+
}
146+
catch ( InterruptedException e )
147+
{
148+
// restore the interrupted status
149+
Thread.currentThread().interrupt();
150+
throw new ServiceUnavailableException( "Thread was interrupted while performing discovery", e );
151+
}
112152
}
113153
}
114154
}

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,30 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw
833833
}
834834
}
835835

836+
@Test
837+
public void shouldUseInitialRouterForRediscoveryWhenAllOtherRoutersAreDead() throws Exception
838+
{
839+
// initial router does not have itself in the returned set of routers
840+
StubServer router = StubServer.start( "acquire_endpoints.script", 9010 );
841+
842+
try ( Driver driver = GraphDatabase.driver( "bolt+routing://127.0.0.1:9010", config ) )
843+
{
844+
try ( Session session = driver.session( AccessMode.READ ) )
845+
{
846+
// restart router on the same port with different script that contains itself as reader
847+
assertEquals( 0, router.exitStatus() );
848+
router = StubServer.start( "rediscover_using_initial_router.script", 9010 );
849+
850+
List<Record> records = session.run( "MATCH (n) RETURN n.name AS name" ).list();
851+
assertEquals( 2, records.size() );
852+
assertEquals( "Bob", records.get( 0 ).get( "name" ).asString() );
853+
assertEquals( "Alice", records.get( 1 ).get( "name" ).asString() );
854+
}
855+
}
856+
857+
assertEquals( 0, router.exitStatus() );
858+
}
859+
836860
private static Driver newDriverWithSleeplessClock( String uriString )
837861
{
838862
DriverFactory driverFactory = new DriverFactoryWithClock( new SleeplessClock() );

driver/src/test/java/org/neo4j/driver/internal/RoutingDriverTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private final Driver driverWithServers( long ttl, Map<String,Object>... serverIn
345345
private Driver driverWithPool( ConnectionPool pool )
346346
{
347347
RoutingSettings settings = new RoutingSettings( 10, 5_000 );
348-
ConnectionProvider connectionProvider = new LoadBalancer( settings, pool, clock, logging, SEED );
348+
ConnectionProvider connectionProvider = new LoadBalancer( SEED, settings, pool, clock, logging );
349349
Config config = Config.build().withLogging( logging ).toConfig();
350350
SessionFactory sessionFactory = new NetworkSessionWithAddressFactory( connectionProvider, config );
351351
return new InternalDriver( insecure(), sessionFactory, logging );

driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
7373
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );
7474

7575
// when
76-
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );
76+
LoadBalancer balancer = new LoadBalancer( conns, routingTable, rediscovery, DEV_NULL_LOGGER );
7777

7878
// then
7979
assertNotNull( balancer );
8080
InOrder inOrder = inOrder( rediscovery, routingTable, conns );
81-
inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable );
81+
inOrder.verify( rediscovery ).lookupClusterComposition( conns, routingTable );
8282
inOrder.verify( routingTable ).update( any( ClusterComposition.class ) );
8383
inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) );
8484
}
@@ -88,7 +88,7 @@ public void shouldEnsureRoutingOnInitialization() throws Exception
8888
{
8989
// given & when
9090
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
91-
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
91+
LoadBalancer balancer = new LoadBalancer( mock( ConnectionPool.class ), mock( RoutingTable.class ),
9292
mock( Rediscovery.class ), DEV_NULL_LOGGER )
9393
{
9494
@Override
@@ -143,7 +143,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
143143
RoutingTable routingTable = mock( RoutingTable.class );
144144
ConnectionPool connectionPool = mock( ConnectionPool.class );
145145
Rediscovery rediscovery = mock( Rediscovery.class );
146-
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
146+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
147147
BoltServerAddress address = new BoltServerAddress( "host", 42 );
148148

149149
PooledConnection connection = newConnectionWithFailingSync( address );
@@ -174,7 +174,7 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing
174174
PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address );
175175
when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync );
176176
Rediscovery rediscovery = mock( Rediscovery.class );
177-
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
177+
LoadBalancer loadBalancer = new LoadBalancer( connectionPool, routingTable, rediscovery, DEV_NULL_LOGGER );
178178

179179
Session session = newSession( loadBalancer );
180180
// begin transaction to make session obtain a connection
@@ -205,7 +205,7 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne
205205
when( routingTable.readers() ).thenReturn( readerAddrs );
206206
when( routingTable.writers() ).thenReturn( writerAddrs );
207207

208-
return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
208+
return new LoadBalancer( connPool, routingTable, mock( Rediscovery.class ), DEV_NULL_LOGGER );
209209
}
210210

211211
private static Session newSession( LoadBalancer loadBalancer )

0 commit comments

Comments
 (0)