Skip to content

Commit 33996f5

Browse files
authored
Merge pull request #686 from zhenlineo/4.0-refactor
Pulled RoutingTableHandler interface out
2 parents ba3f244 + 5a180c1 commit 33996f5

File tree

10 files changed

+193
-152
lines changed

10 files changed

+193
-152
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/ConnectionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121
import org.neo4j.driver.AccessMode;
2222
import org.neo4j.driver.Bookmark;
2323
import org.neo4j.driver.internal.DatabaseName;
24+
import org.neo4j.driver.internal.spi.ConnectionProvider;
2425

26+
/**
27+
* Describes what kind of connection to return by {@link ConnectionProvider}
28+
*/
2529
public interface ConnectionContext
2630
{
2731
DatabaseName databaseName();

driver/src/main/java/org/neo4j/driver/internal/async/NetworkSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ private void ensureSessionIsOpen()
345345
}
346346

347347
/**
348-
* A {@link Connection} shall fulfil this {@link ImmutableConnectionContext} when acquired from a connection provider.
348+
* The {@link NetworkSessionConnectionContext#mode} can be mutable for a session connection context
349349
*/
350350
private static class NetworkSessionConnectionContext implements ConnectionContext
351351
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableHandler.java

Lines changed: 5 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -19,137 +19,19 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.Set;
22-
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.CompletionStage;
2423

25-
import org.neo4j.driver.Logger;
2624
import org.neo4j.driver.internal.BoltServerAddress;
27-
import org.neo4j.driver.internal.DatabaseName;
2825
import org.neo4j.driver.internal.RoutingErrorHandler;
2926
import org.neo4j.driver.internal.async.ConnectionContext;
30-
import org.neo4j.driver.internal.spi.ConnectionPool;
31-
import org.neo4j.driver.internal.util.Futures;
3227

33-
import static java.util.concurrent.CompletableFuture.completedFuture;
34-
35-
public class RoutingTableHandler implements RoutingErrorHandler
28+
public interface RoutingTableHandler extends RoutingErrorHandler
3629
{
37-
private final RoutingTable routingTable;
38-
private final DatabaseName databaseName;
39-
private final RoutingTableRegistry routingTableRegistry;
40-
private volatile CompletableFuture<RoutingTable> refreshRoutingTableFuture;
41-
private final ConnectionPool connectionPool;
42-
private final Rediscovery rediscovery;
43-
private final Logger log;
44-
private final long routingTablePurgeDelayMs;
45-
46-
public RoutingTableHandler( RoutingTable routingTable, Rediscovery rediscovery, ConnectionPool connectionPool, RoutingTableRegistry routingTableRegistry,
47-
Logger log, long routingTablePurgeDelayMs )
48-
{
49-
this.routingTable = routingTable;
50-
this.databaseName = routingTable.database();
51-
this.rediscovery = rediscovery;
52-
this.connectionPool = connectionPool;
53-
this.routingTableRegistry = routingTableRegistry;
54-
this.log = log;
55-
this.routingTablePurgeDelayMs = routingTablePurgeDelayMs;
56-
}
57-
58-
@Override
59-
public void onConnectionFailure( BoltServerAddress address )
60-
{
61-
// remove server from the routing table, to prevent concurrent threads from making connections to this address
62-
routingTable.forget( address );
63-
}
64-
65-
@Override
66-
public void onWriteFailure( BoltServerAddress address )
67-
{
68-
routingTable.forgetWriter( address );
69-
}
70-
71-
synchronized CompletionStage<RoutingTable> refreshRoutingTable( ConnectionContext context )
72-
{
73-
if ( refreshRoutingTableFuture != null )
74-
{
75-
// refresh is already happening concurrently, just use it's result
76-
return refreshRoutingTableFuture;
77-
}
78-
else if ( routingTable.isStaleFor( context.mode() ) )
79-
{
80-
// existing routing table is not fresh and should be updated
81-
log.debug( "Routing table for database '%s' is stale. %s", databaseName.description(), routingTable );
82-
83-
CompletableFuture<RoutingTable> resultFuture = new CompletableFuture<>();
84-
refreshRoutingTableFuture = resultFuture;
85-
86-
rediscovery.lookupClusterComposition( routingTable, connectionPool, context.rediscoveryBookmark() )
87-
.whenComplete( ( composition, completionError ) ->
88-
{
89-
Throwable error = Futures.completionExceptionCause( completionError );
90-
if ( error != null )
91-
{
92-
clusterCompositionLookupFailed( error );
93-
}
94-
else
95-
{
96-
freshClusterCompositionFetched( composition );
97-
}
98-
} );
99-
100-
return resultFuture;
101-
}
102-
else
103-
{
104-
// existing routing table is fresh, use it
105-
return completedFuture( routingTable );
106-
}
107-
}
108-
109-
private synchronized void freshClusterCompositionFetched( ClusterComposition composition )
110-
{
111-
try
112-
{
113-
routingTable.update( composition );
114-
routingTableRegistry.removeAged();
115-
connectionPool.retainAll( routingTableRegistry.allServers() );
116-
117-
log.debug( "Updated routing table for database '%s'. %s", databaseName.description(), routingTable );
118-
119-
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
120-
refreshRoutingTableFuture = null;
121-
routingTableFuture.complete( routingTable );
122-
}
123-
catch ( Throwable error )
124-
{
125-
clusterCompositionLookupFailed( error );
126-
}
127-
}
128-
129-
private synchronized void clusterCompositionLookupFailed( Throwable error )
130-
{
131-
log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), error );
132-
routingTableRegistry.remove( databaseName );
133-
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
134-
refreshRoutingTableFuture = null;
135-
routingTableFuture.completeExceptionally( error );
136-
}
30+
Set<BoltServerAddress> servers();
13731

138-
// This method cannot be synchronized as it will be visited by all routing table handler's threads concurrently
139-
public Set<BoltServerAddress> servers()
140-
{
141-
return routingTable.servers();
142-
}
32+
boolean isRoutingTableAged();
14333

144-
// This method cannot be synchronized as it will be visited by all routing table handler's threads concurrently
145-
public boolean isRoutingTableAged()
146-
{
147-
return refreshRoutingTableFuture == null && routingTable.hasBeenStaleFor( routingTablePurgeDelayMs );
148-
}
34+
CompletionStage<RoutingTable> ensureRoutingTable( ConnectionContext context );
14935

150-
// for testing only
151-
public RoutingTable routingTable()
152-
{
153-
return routingTable;
154-
}
36+
RoutingTable routingTable();
15537
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.Set;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.CompletionStage;
24+
25+
import org.neo4j.driver.Logger;
26+
import org.neo4j.driver.internal.BoltServerAddress;
27+
import org.neo4j.driver.internal.DatabaseName;
28+
import org.neo4j.driver.internal.async.ConnectionContext;
29+
import org.neo4j.driver.internal.spi.ConnectionPool;
30+
import org.neo4j.driver.internal.util.Futures;
31+
32+
import static java.util.concurrent.CompletableFuture.completedFuture;
33+
34+
public class RoutingTableHandlerImpl implements RoutingTableHandler
35+
{
36+
private final RoutingTable routingTable;
37+
private final DatabaseName databaseName;
38+
private final RoutingTableRegistry routingTableRegistry;
39+
private volatile CompletableFuture<RoutingTable> refreshRoutingTableFuture;
40+
private final ConnectionPool connectionPool;
41+
private final Rediscovery rediscovery;
42+
private final Logger log;
43+
private final long routingTablePurgeDelayMs;
44+
45+
public RoutingTableHandlerImpl( RoutingTable routingTable, Rediscovery rediscovery, ConnectionPool connectionPool, RoutingTableRegistry routingTableRegistry,
46+
Logger log, long routingTablePurgeDelayMs )
47+
{
48+
this.routingTable = routingTable;
49+
this.databaseName = routingTable.database();
50+
this.rediscovery = rediscovery;
51+
this.connectionPool = connectionPool;
52+
this.routingTableRegistry = routingTableRegistry;
53+
this.log = log;
54+
this.routingTablePurgeDelayMs = routingTablePurgeDelayMs;
55+
}
56+
57+
@Override
58+
public void onConnectionFailure( BoltServerAddress address )
59+
{
60+
// remove server from the routing table, to prevent concurrent threads from making connections to this address
61+
routingTable.forget( address );
62+
}
63+
64+
@Override
65+
public void onWriteFailure( BoltServerAddress address )
66+
{
67+
routingTable.forgetWriter( address );
68+
}
69+
70+
public synchronized CompletionStage<RoutingTable> ensureRoutingTable( ConnectionContext context )
71+
{
72+
if ( refreshRoutingTableFuture != null )
73+
{
74+
// refresh is already happening concurrently, just use it's result
75+
return refreshRoutingTableFuture;
76+
}
77+
else if ( routingTable.isStaleFor( context.mode() ) )
78+
{
79+
// existing routing table is not fresh and should be updated
80+
log.debug( "Routing table for database '%s' is stale. %s", databaseName.description(), routingTable );
81+
82+
CompletableFuture<RoutingTable> resultFuture = new CompletableFuture<>();
83+
refreshRoutingTableFuture = resultFuture;
84+
85+
rediscovery.lookupClusterComposition( routingTable, connectionPool, context.rediscoveryBookmark() )
86+
.whenComplete( ( composition, completionError ) ->
87+
{
88+
Throwable error = Futures.completionExceptionCause( completionError );
89+
if ( error != null )
90+
{
91+
clusterCompositionLookupFailed( error );
92+
}
93+
else
94+
{
95+
freshClusterCompositionFetched( composition );
96+
}
97+
} );
98+
99+
return resultFuture;
100+
}
101+
else
102+
{
103+
// existing routing table is fresh, use it
104+
return completedFuture( routingTable );
105+
}
106+
}
107+
108+
private synchronized void freshClusterCompositionFetched( ClusterComposition composition )
109+
{
110+
try
111+
{
112+
routingTable.update( composition );
113+
routingTableRegistry.removeAged();
114+
connectionPool.retainAll( routingTableRegistry.allServers() );
115+
116+
log.debug( "Updated routing table for database '%s'. %s", databaseName.description(), routingTable );
117+
118+
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
119+
refreshRoutingTableFuture = null;
120+
routingTableFuture.complete( routingTable );
121+
}
122+
catch ( Throwable error )
123+
{
124+
clusterCompositionLookupFailed( error );
125+
}
126+
}
127+
128+
private synchronized void clusterCompositionLookupFailed( Throwable error )
129+
{
130+
log.error( String.format( "Failed to update routing table for database '%s'. Current routing table: %s.", databaseName.description(), routingTable ), error );
131+
routingTableRegistry.remove( databaseName );
132+
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
133+
refreshRoutingTableFuture = null;
134+
routingTableFuture.completeExceptionally( error );
135+
}
136+
137+
// This method cannot be synchronized as it will be visited by all routing table handler's threads concurrently
138+
@Override
139+
public Set<BoltServerAddress> servers()
140+
{
141+
return routingTable.servers();
142+
}
143+
144+
// This method cannot be synchronized as it will be visited by all routing table handler's threads concurrently
145+
@Override
146+
public boolean isRoutingTableAged()
147+
{
148+
return refreshRoutingTableFuture == null && routingTable.hasBeenStaleFor( routingTablePurgeDelayMs );
149+
}
150+
151+
public RoutingTable routingTable()
152+
{
153+
return routingTable;
154+
}
155+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@
3232
public interface RoutingTableRegistry
3333
{
3434
/**
35-
* Fresh the routing table for the database with given access mode.
35+
* Ensures the routing table for the database with given access mode.
3636
* For server version lower than 4.0, the database name will be ignored while refreshing routing table.
3737
* @return The future of a new routing table handler.
3838
*/
39-
CompletionStage<RoutingTableHandler> refreshRoutingTable( ConnectionContext context );
39+
CompletionStage<RoutingTableHandler> ensureRoutingTable( ConnectionContext context );
4040

4141
/**
4242
* @return all servers in the registry

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTableRegistryImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ public RoutingTableRegistryImpl( ConnectionPool connectionPool, Rediscovery redi
5050
}
5151

5252
@Override
53-
public CompletionStage<RoutingTableHandler> refreshRoutingTable( ConnectionContext context )
53+
public CompletionStage<RoutingTableHandler> ensureRoutingTable( ConnectionContext context )
5454
{
5555
RoutingTableHandler handler = getOrCreate( context.databaseName() );
56-
return handler.refreshRoutingTable( context ).thenApply( ignored -> handler );
56+
return handler.ensureRoutingTable( context ).thenApply( ignored -> handler );
5757
}
5858

5959
@Override
@@ -124,7 +124,7 @@ static class RoutingTableHandlerFactory
124124
RoutingTableHandler newInstance( DatabaseName databaseName, RoutingTableRegistry allTables )
125125
{
126126
ClusterRoutingTable routingTable = new ClusterRoutingTable( databaseName, clock );
127-
return new RoutingTableHandler( routingTable, rediscovery, connectionPool, allTables, log, routingTablePurgeDelayMs );
127+
return new RoutingTableHandlerImpl( routingTable, rediscovery, connectionPool, allTables, log, routingTablePurgeDelayMs );
128128
}
129129
}
130130
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ private LoadBalancer( ConnectionPool connectionPool, Rediscovery rediscovery, Ro
9696
@Override
9797
public CompletionStage<Connection> acquireConnection( ConnectionContext context )
9898
{
99-
return routingTables.refreshRoutingTable( context )
99+
return routingTables.ensureRoutingTable( context )
100100
.thenCompose( handler -> acquire( context.mode(), handler.routingTable() )
101101
.thenApply( connection -> new RoutingConnection( connection, context.databaseName(), context.mode(), handler ) ) );
102102
}
103103

104104
@Override
105105
public CompletionStage<Void> verifyConnectivity()
106106
{
107-
return this.supportsMultiDb().thenCompose( supports -> routingTables.refreshRoutingTable( simple( supports ) ) ).handle( ( ignored, error ) -> {
107+
return this.supportsMultiDb().thenCompose( supports -> routingTables.ensureRoutingTable( simple( supports ) ) ).handle( ( ignored, error ) -> {
108108
if ( error != null )
109109
{
110110
Throwable cause = completionExceptionCause( error );

0 commit comments

Comments
 (0)