Skip to content

Commit 0c57215

Browse files
authored
Merge pull request #436 from lutovich/1.5-no-purge
Relax connection termination policy in routing driver
2 parents e705db1 + bf6b57b commit 0c57215

20 files changed

+649
-298
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/NettyConnection.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import io.netty.channel.Channel;
2222
import io.netty.channel.pool.ChannelPool;
23-
import io.netty.util.concurrent.Promise;
2423

2524
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
2626
import java.util.concurrent.CompletionStage;
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828

@@ -39,16 +39,14 @@
3939
import org.neo4j.driver.internal.util.ServerVersion;
4040
import org.neo4j.driver.v1.Value;
4141

42-
import static java.util.concurrent.CompletableFuture.completedFuture;
43-
import static org.neo4j.driver.internal.util.Futures.asCompletionStage;
44-
4542
public class NettyConnection implements Connection
4643
{
4744
private final Channel channel;
4845
private final InboundMessageDispatcher messageDispatcher;
4946
private final BoltServerAddress serverAddress;
5047
private final ServerVersion serverVersion;
5148
private final ChannelPool channelPool;
49+
private final CompletableFuture<Void> releaseFuture;
5250
private final Clock clock;
5351

5452
private final AtomicBoolean open = new AtomicBoolean( true );
@@ -61,6 +59,7 @@ public NettyConnection( Channel channel, ChannelPool channelPool, Clock clock )
6159
this.serverAddress = ChannelAttributes.serverAddress( channel );
6260
this.serverVersion = ChannelAttributes.serverVersion( channel );
6361
this.channelPool = channelPool;
62+
this.releaseFuture = new CompletableFuture<>();
6463
this.clock = clock;
6564
}
6665

@@ -111,14 +110,9 @@ public CompletionStage<Void> release()
111110
{
112111
if ( open.compareAndSet( true, false ) )
113112
{
114-
Promise<Void> releasePromise = channel.eventLoop().newPromise();
115-
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releasePromise ) );
116-
return asCompletionStage( releasePromise );
117-
}
118-
else
119-
{
120-
return completedFuture( null );
113+
reset( new ResetResponseHandler( channel, channelPool, messageDispatcher, clock, releaseFuture ) );
121114
}
115+
return releaseFuture;
122116
}
123117

124118
@Override

driver/src/main/java/org/neo4j/driver/internal/async/pool/ConnectionPoolImpl.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.util.concurrent.Future;
2626

2727
import java.util.Map;
28+
import java.util.Set;
2829
import java.util.concurrent.CompletionException;
2930
import java.util.concurrent.CompletionStage;
3031
import java.util.concurrent.ConcurrentHashMap;
@@ -58,10 +59,16 @@ public class ConnectionPoolImpl implements ConnectionPool
5859

5960
public ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, PoolSettings settings,
6061
Logging logging, Clock clock )
62+
{
63+
this( connector, bootstrap, new ActiveChannelTracker( logging ), settings, logging, clock );
64+
}
65+
66+
ConnectionPoolImpl( ChannelConnector connector, Bootstrap bootstrap, ActiveChannelTracker activeChannelTracker,
67+
PoolSettings settings, Logging logging, Clock clock )
6168
{
6269
this.connector = connector;
6370
this.bootstrap = bootstrap;
64-
this.activeChannelTracker = new ActiveChannelTracker( logging );
71+
this.activeChannelTracker = activeChannelTracker;
6572
this.channelHealthChecker = new NettyChannelHealthChecker( settings, clock, logging );
6673
this.settings = settings;
6774
this.clock = clock;
@@ -86,27 +93,30 @@ public CompletionStage<Connection> acquire( BoltServerAddress address )
8693
}
8794

8895
@Override
89-
public void purge( BoltServerAddress address )
96+
public void retainAll( Set<BoltServerAddress> addressesToRetain )
9097
{
91-
log.info( "Purging connections towards %s", address );
92-
93-
// purge active connections
94-
activeChannelTracker.purge( address );
95-
96-
// purge idle connections in the pool and pool itself
97-
ChannelPool pool = pools.remove( address );
98-
if ( pool != null )
98+
for ( BoltServerAddress address : pools.keySet() )
9999
{
100-
pool.close();
100+
if ( !addressesToRetain.contains( address ) )
101+
{
102+
int activeChannels = activeChannelTracker.activeChannelCount( address );
103+
if ( activeChannels == 0 )
104+
{
105+
// address is not present in updated routing table and has no active connections
106+
// it's now safe to terminate corresponding connection pool and forget about it
107+
108+
ChannelPool pool = pools.remove( address );
109+
if ( pool != null )
110+
{
111+
log.info( "Closing connection pool towards %s, it has no active connections " +
112+
"and is not in the routing table", address );
113+
pool.close();
114+
}
115+
}
116+
}
101117
}
102118
}
103119

104-
@Override
105-
public boolean hasAddress( BoltServerAddress address )
106-
{
107-
return pools.containsKey( address );
108-
}
109-
110120
@Override
111121
public int activeConnections( BoltServerAddress address )
112122
{
@@ -157,7 +167,7 @@ private ChannelPool getOrCreatePool( BoltServerAddress address )
157167
return pool;
158168
}
159169

160-
private NettyChannelPool newPool( BoltServerAddress address )
170+
ChannelPool newPool( BoltServerAddress address )
161171
{
162172
return new NettyChannelPool( address, connector, bootstrap, activeChannelTracker, channelHealthChecker,
163173
settings.connectionAcquisitionTimeout(), settings.maxConnectionPoolSize() );

driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java

Lines changed: 2 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -39,54 +39,9 @@ public int size()
3939
return addresses.length;
4040
}
4141

42-
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
42+
public synchronized void update( Set<BoltServerAddress> addresses )
4343
{
44-
BoltServerAddress[] prev = this.addresses;
45-
if ( addresses.isEmpty() )
46-
{
47-
this.addresses = NONE;
48-
return;
49-
}
50-
if ( prev.length == 0 )
51-
{
52-
this.addresses = addresses.toArray( NONE );
53-
return;
54-
}
55-
BoltServerAddress[] copy = null;
56-
if ( addresses.size() != prev.length )
57-
{
58-
copy = new BoltServerAddress[addresses.size()];
59-
}
60-
int j = 0;
61-
for ( int i = 0; i < prev.length; i++ )
62-
{
63-
if ( addresses.remove( prev[i] ) )
64-
{
65-
if ( copy != null )
66-
{
67-
copy[j++] = prev[i];
68-
}
69-
}
70-
else
71-
{
72-
removed.add( prev[i] );
73-
if ( copy == null )
74-
{
75-
copy = new BoltServerAddress[prev.length];
76-
System.arraycopy( prev, 0, copy, 0, i );
77-
j = i;
78-
}
79-
}
80-
}
81-
if ( copy == null )
82-
{
83-
return;
84-
}
85-
for ( BoltServerAddress address : addresses )
86-
{
87-
copy[j++] = address;
88-
}
89-
this.addresses = copy;
44+
this.addresses = addresses.toArray( NONE );
9045
}
9146

9247
public synchronized void remove( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.neo4j.driver.internal.cluster;
2121

22+
import java.util.Collections;
2223
import java.util.HashSet;
2324
import java.util.LinkedHashSet;
2425
import java.util.Set;
@@ -43,7 +44,7 @@ public class ClusterRoutingTable implements RoutingTable
4344
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4445
{
4546
this( clock );
46-
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
47+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ) );
4748
}
4849

4950
private ClusterRoutingTable( Clock clock )
@@ -66,14 +67,12 @@ public boolean isStaleFor( AccessMode mode )
6667
}
6768

6869
@Override
69-
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
70+
public synchronized void update( ClusterComposition cluster )
7071
{
7172
expirationTimeout = cluster.expirationTimestamp();
72-
Set<BoltServerAddress> removed = new HashSet<>();
73-
readers.update( cluster.readers(), removed );
74-
writers.update( cluster.writers(), removed );
75-
routers.update( cluster.routers(), removed );
76-
return removed;
73+
readers.update( cluster.readers() );
74+
writers.update( cluster.writers() );
75+
routers.update( cluster.routers() );
7776
}
7877

7978
@Override
@@ -102,6 +101,16 @@ public AddressSet routers()
102101
return routers;
103102
}
104103

104+
@Override
105+
public Set<BoltServerAddress> servers()
106+
{
107+
Set<BoltServerAddress> servers = new HashSet<>();
108+
Collections.addAll( servers, readers.toArray() );
109+
Collections.addAll( servers, writers.toArray() );
110+
Collections.addAll( servers, routers.toArray() );
111+
return servers;
112+
}
113+
105114
@Override
106115
public void removeWriter( BoltServerAddress toRemove )
107116
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,9 @@ public CompletionStage<RoutingProcedureResponse> run( CompletionStage<Connection
5353
return connectionStage.thenCompose( connection ->
5454
{
5555
Statement procedure = procedureStatement( connection.serverVersion() );
56-
return runProcedure( connection, procedure ).handle( ( records, error ) ->
57-
{
58-
Throwable cause = Futures.completionErrorCause( error );
59-
if ( cause != null )
60-
{
61-
return handleError( procedure, cause );
62-
}
63-
else
64-
{
65-
return new RoutingProcedureResponse( procedure, records );
66-
}
67-
} );
56+
return runProcedure( connection, procedure )
57+
.thenCompose( records -> releaseConnection( connection, records ) )
58+
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
6859
} );
6960
}
7061

@@ -87,6 +78,30 @@ private Statement procedureStatement( ServerVersion serverVersion )
8778
}
8879
}
8980

81+
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
82+
{
83+
// It is not strictly required to release connection after routing procedure invocation because it'll
84+
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
85+
// in background. However, releasing it early as part of whole chain makes it easier to reason about
86+
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
87+
// routing table will be closed immediately.
88+
return connection.release().thenApply( ignore -> records );
89+
}
90+
91+
private RoutingProcedureResponse processProcedureResponse( Statement procedure, List<Record> records,
92+
Throwable error )
93+
{
94+
Throwable cause = Futures.completionErrorCause( error );
95+
if ( cause != null )
96+
{
97+
return handleError( procedure, cause );
98+
}
99+
else
100+
{
101+
return new RoutingProcedureResponse( procedure, records );
102+
}
103+
}
104+
90105
private RoutingProcedureResponse handleError( Statement procedure, Throwable error )
91106
{
92107
if ( error instanceof ClientException )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public interface RoutingTable
2727
{
2828
boolean isStaleFor( AccessMode mode );
2929

30-
Set<BoltServerAddress> update( ClusterComposition cluster );
30+
void update( ClusterComposition cluster );
3131

3232
void forget( BoltServerAddress address );
3333

@@ -37,5 +37,7 @@ public interface RoutingTable
3737

3838
AddressSet routers();
3939

40+
Set<BoltServerAddress> servers();
41+
4042
void removeWriter( BoltServerAddress toRemove );
4143
}

driver/src/main/java/org/neo4j/driver/internal/cluster/loadbalancing/LoadBalancer.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import io.netty.util.concurrent.EventExecutorGroup;
2222

23-
import java.util.Set;
2423
import java.util.concurrent.CompletableFuture;
2524
import java.util.concurrent.CompletionStage;
2625

@@ -125,10 +124,8 @@ public CompletionStage<Void> close()
125124

126125
private synchronized void forget( BoltServerAddress address )
127126
{
128-
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
127+
// remove from the routing table, to prevent concurrent threads from making connections to this address
129128
routingTable.forget( address );
130-
// drop all current connections to the address
131-
connectionPool.purge( address );
132129
}
133130

134131
private synchronized CompletionStage<RoutingTable> freshRoutingTable( AccessMode mode )
@@ -171,18 +168,21 @@ else if ( routingTable.isStaleFor( mode ) )
171168

172169
private synchronized void freshClusterCompositionFetched( ClusterComposition composition )
173170
{
174-
Set<BoltServerAddress> removed = routingTable.update( composition );
175-
176-
for ( BoltServerAddress address : removed )
171+
try
177172
{
178-
connectionPool.purge( address );
179-
}
173+
routingTable.update( composition );
174+
connectionPool.retainAll( routingTable.servers() );
180175

181-
log.info( "Refreshed routing information. %s", routingTable );
176+
log.info( "Refreshed routing information. %s", routingTable );
182177

183-
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
184-
refreshRoutingTableFuture = null;
185-
routingTableFuture.complete( routingTable );
178+
CompletableFuture<RoutingTable> routingTableFuture = refreshRoutingTableFuture;
179+
refreshRoutingTableFuture = null;
180+
routingTableFuture.complete( routingTable );
181+
}
182+
catch ( Throwable error )
183+
{
184+
clusterCompositionLookupFailed( error );
185+
}
186186
}
187187

188188
private synchronized void clusterCompositionLookupFailed( Throwable error )

0 commit comments

Comments
 (0)