Skip to content

Commit 71ec73c

Browse files
author
Zhen Li
committed
Moved useInitialRouter to routing table which is only used by a single thread.
1 parent 63d6f93 commit 71ec73c

File tree

12 files changed

+124
-90
lines changed

12 files changed

+124
-90
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,13 @@ public class ClusterRoutingTable implements RoutingTable
3535
private static final int MIN_ROUTERS = 1;
3636

3737
private final Clock clock;
38-
private volatile long expirationTimeout;
38+
private volatile long expirationTimestamp;
3939
private final AddressSet readers;
4040
private final AddressSet writers;
4141
private final AddressSet routers;
42+
4243
private final String databaseName; // specifies this routing table is the routing table of database named this.
44+
private boolean preferInitialRouter;
4345

4446
public ClusterRoutingTable( String ofDatabase, Clock clock, BoltServerAddress... routingAddresses )
4547
{
@@ -51,7 +53,8 @@ private ClusterRoutingTable( String ofDatabase, Clock clock )
5153
{
5254
this.databaseName = ofDatabase;
5355
this.clock = clock;
54-
this.expirationTimeout = clock.millis() - 1;
56+
this.expirationTimestamp = clock.millis() - 1;
57+
this.preferInitialRouter = true;
5558

5659
this.readers = new AddressSet();
5760
this.writers = new AddressSet();
@@ -61,30 +64,31 @@ private ClusterRoutingTable( String ofDatabase, Clock clock )
6164
@Override
6265
public boolean isStaleFor( AccessMode mode )
6366
{
64-
return expirationTimeout < clock.millis() ||
67+
return expirationTimestamp < clock.millis() ||
6568
routers.size() < MIN_ROUTERS ||
6669
mode == AccessMode.READ && readers.size() == 0 ||
6770
mode == AccessMode.WRITE && writers.size() == 0;
6871
}
6972

7073
@Override
71-
public boolean isStale( long staleRoutingTableTimeout )
74+
public boolean hasBeenStaleFor( long extraTime )
7275
{
73-
long expireTime = expirationTimeout + staleRoutingTableTimeout;
74-
if ( expireTime < 0 )
76+
long totalTime = expirationTimestamp + extraTime;
77+
if ( totalTime < 0 )
7578
{
76-
expireTime = Long.MAX_VALUE;
79+
totalTime = Long.MAX_VALUE;
7780
}
78-
return expireTime < clock.millis();
81+
return totalTime < clock.millis();
7982
}
8083

8184
@Override
8285
public synchronized void update( ClusterComposition cluster )
8386
{
84-
expirationTimeout = cluster.expirationTimestamp();
87+
expirationTimestamp = cluster.expirationTimestamp();
8588
readers.update( cluster.readers() );
8689
writers.update( cluster.writers() );
8790
routers.update( cluster.routers() );
91+
preferInitialRouter = !cluster.hasWriters();
8892
}
8993

9094
@Override
@@ -134,11 +138,15 @@ public void removeWriter( BoltServerAddress toRemove )
134138
writers.remove( toRemove );
135139
}
136140

141+
@Override
142+
public boolean preferInitialRouter()
143+
{
144+
return preferInitialRouter;
145+
}
137146

138147
@Override
139148
public synchronized String toString()
140149
{
141-
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'",
142-
expirationTimeout, clock.millis(), routers, writers, readers, databaseName );
150+
return format( "Ttl %s, currentTime %s, routers %s, writers %s, readers %s, database '%s'", expirationTimestamp, clock.millis(), routers, writers, readers, databaseName );
143151
}
144152
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RediscoveryImpl.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -62,29 +62,19 @@ public class RediscoveryImpl implements Rediscovery
6262
private final ServerAddressResolver resolver;
6363
private final EventExecutorGroup eventExecutorGroup;
6464

65-
private volatile boolean useInitialRouter; // TODO thread safe?
66-
6765
public RediscoveryImpl( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider,
6866
EventExecutorGroup eventExecutorGroup, ServerAddressResolver resolver, Logger logger )
69-
{
70-
this( initialRouter, settings, provider, resolver, eventExecutorGroup, logger, true );
71-
}
72-
73-
// Test-only constructor
74-
RediscoveryImpl( BoltServerAddress initialRouter, RoutingSettings settings, ClusterCompositionProvider provider,
75-
ServerAddressResolver resolver, EventExecutorGroup eventExecutorGroup, Logger logger, boolean useInitialRouter )
7667
{
7768
this.initialRouter = initialRouter;
7869
this.settings = settings;
7970
this.logger = logger;
8071
this.provider = provider;
8172
this.resolver = resolver;
8273
this.eventExecutorGroup = eventExecutorGroup;
83-
this.useInitialRouter = useInitialRouter;
8474
}
8575

8676
/**
87-
* Given a database and its current routing table, and the global connection pool, use the connection composition provider to fetch a new
77+
* Given a database and its current routing table, and the global connection pool, use the global cluster composition provider to fetch a new
8878
* cluster composition, which would be used to update the routing table of the given database and global connection pool.
8979
*
9080
* @param routingTable current routing table of the given database.
@@ -137,23 +127,16 @@ private CompletionStage<ClusterComposition> lookup( RoutingTable routingTable, C
137127
{
138128
CompletionStage<ClusterComposition> compositionStage;
139129

140-
if ( useInitialRouter )
130+
if ( routingTable.preferInitialRouter() )
141131
{
142132
compositionStage = lookupOnInitialRouterThenOnKnownRouters( routingTable, connectionPool );
143-
useInitialRouter = false;
144133
}
145134
else
146135
{
147136
compositionStage = lookupOnKnownRoutersThenOnInitialRouter( routingTable, connectionPool );
148137
}
149138

150-
return compositionStage.whenComplete( ( composition, error ) ->
151-
{
152-
if ( composition != null && !composition.hasWriters() )
153-
{
154-
useInitialRouter = true;
155-
}
156-
} );
139+
return compositionStage;
157140
}
158141

159142
private CompletionStage<ClusterComposition> lookupOnKnownRoutersThenOnInitialRouter( RoutingTable routingTable,

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface RoutingTable
2727
{
2828
boolean isStaleFor( AccessMode mode );
2929

30-
boolean isStale( long staleRoutingTableTimeout );
30+
boolean hasBeenStaleFor( long staleRoutingTableTimeout );
3131

3232
void update( ClusterComposition cluster );
3333

@@ -44,4 +44,6 @@ public interface RoutingTable
4444
String database();
4545

4646
void removeWriter( BoltServerAddress toRemove );
47+
48+
boolean preferInitialRouter();
4749
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private synchronized void freshClusterCompositionFetched( ClusterComposition com
113113
try
114114
{
115115
routingTable.update( composition );
116-
routingTables.removeStale();
116+
routingTables.purgeAged();
117117
connectionPool.retainAll( routingTables.allServers() );
118118

119119
log.info( "Updated routing table for database '%s'. %s", databaseName, routingTable );
@@ -144,9 +144,9 @@ public Set<BoltServerAddress> servers()
144144
}
145145

146146
// This method cannot be synchronized as it will be visited by all routing table handler's threads concurrently
147-
public boolean isRoutingTableStale()
147+
public boolean isRoutingTableAged()
148148
{
149-
return refreshRoutingTableFuture == null && routingTable.isStale( STALE_ROUTING_TABLE_PURGE_TIMEOUT.toMillis() );
149+
return refreshRoutingTableFuture == null && routingTable.hasBeenStaleFor( STALE_ROUTING_TABLE_PURGE_TIMEOUT.toMillis() );
150150
}
151151

152152
// for testing only

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTables.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public interface RoutingTables
4848
void remove( String databaseName );
4949

5050
/**
51-
* Removes all stale routing tables.
51+
* Removes all routing tables that has been not used for a long time.
5252
*/
53-
void removeStale();
53+
void purgeAged();
5454
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTablesImpl.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public class RoutingTablesImpl implements RoutingTables
3636
private final RoutingTableHandlerFactory factory;
3737
private final Logger logger;
3838

39-
public RoutingTablesImpl( ConnectionPool connectionPool, Rediscovery rediscovery, BoltServerAddress initialRouter, Clock clock, Logger logger )
39+
public RoutingTablesImpl( ConnectionPool connectionPool, Rediscovery rediscovery, Clock clock, Logger logger )
4040
{
41-
this( new ConcurrentHashMap<>(), new RoutingTableHandlerFactory( connectionPool, rediscovery, initialRouter, clock, logger ), logger );
41+
this( new ConcurrentHashMap<>(), new RoutingTableHandlerFactory( connectionPool, rediscovery, clock, logger ), logger );
4242
}
4343

4444
RoutingTablesImpl( ConcurrentMap<String,RoutingTableHandler> routingTables, RoutingTableHandlerFactory factory, Logger logger )
@@ -76,12 +76,12 @@ public void remove( String databaseName )
7676
}
7777

7878
@Override
79-
public void removeStale()
79+
public void purgeAged()
8080
{
8181
routingTables.forEach( ( databaseName, handler ) -> {
82-
if ( handler.isRoutingTableStale() )
82+
if ( handler.isRoutingTableAged() )
8383
{
84-
logger.info( "Routing table handler for database '%s' is removed because it is not used for too long. Routing table: %s",
84+
logger.info( "Routing table handler for database '%s' is removed because it has not been used for a long time. Routing table: %s",
8585
databaseName, handler.routingTable() );
8686
routingTables.remove( databaseName );
8787
}
@@ -108,21 +108,19 @@ static class RoutingTableHandlerFactory
108108
private final ConnectionPool connectionPool;
109109
private final Rediscovery rediscovery;
110110
private final Logger log;
111-
private final BoltServerAddress initialRouter;
112111
private final Clock clock;
113112

114-
RoutingTableHandlerFactory( ConnectionPool connectionPool, Rediscovery rediscovery, BoltServerAddress initialRouter, Clock clock, Logger log )
113+
RoutingTableHandlerFactory( ConnectionPool connectionPool, Rediscovery rediscovery, Clock clock, Logger log )
115114
{
116115
this.connectionPool = connectionPool;
117116
this.rediscovery = rediscovery;
118-
this.initialRouter = initialRouter;
119117
this.clock = clock;
120118
this.log = log;
121119
}
122120

123121
RoutingTableHandler newInstance( String databaseName, RoutingTables allTables )
124122
{
125-
ClusterRoutingTable routingTable = new ClusterRoutingTable( databaseName, clock, initialRouter );
123+
ClusterRoutingTable routingTable = new ClusterRoutingTable( databaseName, clock );
126124
return new RoutingTableHandler( routingTable, rediscovery, connectionPool, allTables, log );
127125
}
128126
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ private static RoutingTables createRoutingTables( ConnectionPool connectionPool,
173173
{
174174
Logger log = loadBalancerLogger( logging );
175175
Rediscovery rediscovery = createRediscovery( eventExecutorGroup, initialRouter, resolver, settings, clock, log );
176-
return new RoutingTablesImpl( connectionPool, rediscovery, initialRouter, clock, log );
176+
return new RoutingTablesImpl( connectionPool, rediscovery, clock, log );
177177
}
178178

179179
private static Rediscovery createRediscovery( EventExecutorGroup eventExecutorGroup, BoltServerAddress initialRouter, ServerAddressResolver resolver,

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ void shouldReturnStaleIfTtlExpired()
7070
void shouldReturnStaleIfNoRouter()
7171
{
7272
// Given
73-
FakeClock clock = new FakeClock();
74-
RoutingTable routingTable = newRoutingTable( clock );
73+
RoutingTable routingTable = newRoutingTable();
7574

7675
// When
7776
routingTable.update( createClusterComposition( EMPTY, asList( C ), asList( D, E ) ) );
@@ -85,8 +84,7 @@ void shouldReturnStaleIfNoRouter()
8584
void shouldBeStaleForReadsButNotWritesWhenNoReaders()
8685
{
8786
// Given
88-
FakeClock clock = new FakeClock();
89-
RoutingTable routingTable = newRoutingTable( clock );
87+
RoutingTable routingTable = newRoutingTable();
9088

9189
// When
9290
routingTable.update( createClusterComposition( asList( A, B ), asList( C ), EMPTY ) );
@@ -100,8 +98,7 @@ void shouldBeStaleForReadsButNotWritesWhenNoReaders()
10098
void shouldBeStaleForWritesButNotReadsWhenNoWriters()
10199
{
102100
// Given
103-
FakeClock clock = new FakeClock();
104-
RoutingTable routingTable = newRoutingTable( clock );
101+
RoutingTable routingTable = newRoutingTable();
105102

106103
// When
107104
routingTable.update( createClusterComposition( asList( A, B ), EMPTY, asList( D, E ) ) );
@@ -115,8 +112,7 @@ void shouldBeStaleForWritesButNotReadsWhenNoWriters()
115112
void shouldBeNotStaleWithReadersWritersAndRouters()
116113
{
117114
// Given
118-
FakeClock clock = new FakeClock();
119-
RoutingTable routingTable = newRoutingTable( clock );
115+
RoutingTable routingTable = newRoutingTable();
120116

121117
// When
122118
routingTable.update( createClusterComposition( asList( A, B ), asList( C ), asList( D, E ) ) );
@@ -172,7 +168,7 @@ void shouldContainInitialRouters()
172168
@Test
173169
void shouldPreserveOrderingOfRouters()
174170
{
175-
ClusterRoutingTable routingTable = newRoutingTable( new FakeClock() );
171+
ClusterRoutingTable routingTable = newRoutingTable();
176172
List<BoltServerAddress> routers = asList( A, C, D, F, B, E );
177173

178174
routingTable.update( createClusterComposition( routers, EMPTY, EMPTY ) );
@@ -183,7 +179,7 @@ void shouldPreserveOrderingOfRouters()
183179
@Test
184180
void shouldPreserveOrderingOfWriters()
185181
{
186-
ClusterRoutingTable routingTable = newRoutingTable( new FakeClock() );
182+
ClusterRoutingTable routingTable = newRoutingTable();
187183
List<BoltServerAddress> writers = asList( D, F, A, C, E );
188184

189185
routingTable.update( createClusterComposition( EMPTY, writers, EMPTY ) );
@@ -194,7 +190,7 @@ void shouldPreserveOrderingOfWriters()
194190
@Test
195191
void shouldPreserveOrderingOfReaders()
196192
{
197-
ClusterRoutingTable routingTable = newRoutingTable( new FakeClock() );
193+
ClusterRoutingTable routingTable = newRoutingTable();
198194
List<BoltServerAddress> readers = asList( B, A, F, C, D );
199195

200196
routingTable.update( createClusterComposition( EMPTY, EMPTY, readers ) );
@@ -205,7 +201,7 @@ void shouldPreserveOrderingOfReaders()
205201
@Test
206202
void shouldTreatOneRouterAsValid()
207203
{
208-
ClusterRoutingTable routingTable = newRoutingTable( new FakeClock() );
204+
ClusterRoutingTable routingTable = newRoutingTable();
209205

210206
List<BoltServerAddress> routers = singletonList( A );
211207
List<BoltServerAddress> writers = asList( B, C );
@@ -218,17 +214,69 @@ void shouldTreatOneRouterAsValid()
218214
}
219215

220216
@Test
221-
void shouldBeStaleForExpiredTime() throws Throwable
217+
void shouldHaveBeStaleForExpiredTime() throws Throwable
222218
{
223219
ClusterRoutingTable routingTable = newRoutingTable( Clock.SYSTEM );
224-
assertTrue( routingTable.isStale( 0 ) );
220+
assertTrue( routingTable.hasBeenStaleFor( 0 ) );
225221
}
226222

227223
@Test
228-
void shouldNotBeStaleForExpiredTime() throws Throwable
224+
void shouldNotHaveBeStaleForUnexpiredTime() throws Throwable
229225
{
230226
ClusterRoutingTable routingTable = newRoutingTable( Clock.SYSTEM );
231-
assertFalse( routingTable.isStale( Duration.ofSeconds( 30 ).toMillis() ) );
227+
assertFalse( routingTable.hasBeenStaleFor( Duration.ofSeconds( 30 ).toMillis() ) );
228+
}
229+
230+
@Test
231+
void shouldDefaultToPreferInitialRouter() throws Throwable
232+
{
233+
ClusterRoutingTable routingTable = newRoutingTable();
234+
assertTrue( routingTable.preferInitialRouter() );
235+
}
236+
237+
@Test
238+
void shouldPreferInitialRouterIfNoWriter() throws Throwable
239+
{
240+
ClusterRoutingTable routingTable = newRoutingTable();
241+
routingTable.update( createClusterComposition( EMPTY, EMPTY, EMPTY ) );
242+
assertTrue( routingTable.preferInitialRouter() );
243+
244+
routingTable.update( createClusterComposition( singletonList( A ), EMPTY, singletonList( A ) ) );
245+
assertTrue( routingTable.preferInitialRouter() );
246+
247+
routingTable.update( createClusterComposition( asList( A, B ), EMPTY, asList( A, B ) ) );
248+
assertTrue( routingTable.preferInitialRouter() );
249+
250+
routingTable.update( createClusterComposition( EMPTY, EMPTY, singletonList( A ) ) );
251+
assertTrue( routingTable.preferInitialRouter() );
252+
253+
routingTable.update( createClusterComposition( singletonList( A ), EMPTY, EMPTY ) );
254+
assertTrue( routingTable.preferInitialRouter() );
255+
}
256+
257+
@Test
258+
void shouldNotPreferInitialRouterIfHasWriter() throws Throwable
259+
{
260+
ClusterRoutingTable routingTable = newRoutingTable();
261+
routingTable.update( createClusterComposition( EMPTY, singletonList( A ), EMPTY ) );
262+
assertFalse( routingTable.preferInitialRouter() );
263+
264+
routingTable.update( createClusterComposition( singletonList( A ), singletonList( A ), singletonList( A ) ) );
265+
assertFalse( routingTable.preferInitialRouter() );
266+
267+
routingTable.update( createClusterComposition( asList( A, B ), singletonList( A ), asList( A, B ) ) );
268+
assertFalse( routingTable.preferInitialRouter() );
269+
270+
routingTable.update( createClusterComposition( EMPTY, singletonList( A ), singletonList( A ) ) );
271+
assertFalse( routingTable.preferInitialRouter() );
272+
273+
routingTable.update( createClusterComposition( singletonList( A ), singletonList( A ), EMPTY ) );
274+
assertFalse( routingTable.preferInitialRouter() );
275+
}
276+
277+
private ClusterRoutingTable newRoutingTable()
278+
{
279+
return new ClusterRoutingTable( ABSENT_DB_NAME, new FakeClock() );
232280
}
233281

234282
private ClusterRoutingTable newRoutingTable( Clock clock )

0 commit comments

Comments
 (0)