Skip to content

Commit 8ed212a

Browse files
committed
Relax connection termination policy in routing driver
Previously routing driver terminated all connections towards a particular address when one of active connections had a network error. Connections were also terminated when new routing table did not contain some address that was present in the previous routing table. Such behaviour might be problematic because it results in terminated queries. Network errors might have been temporary but always resulted in termination of active queries. This commit makes driver keep existing connections and terminate only idle ones on network error. Address will still be excluded from the routing table and subsequent queries will not be pointed on it. Corresponding connection pool is transferred into a "passive" state where it does not serve new connections and disposes returned connections immediately. Pool is transferred back to "active" state during next rediscovery only if corresponding address is present in the new routing table. Old behaviour still remains and can be activated using `-DpurgeOnError=true` system property. It exists as a fallback only for exceptional cases and will most likely be silently removed in future.
1 parent 0c60f63 commit 8ed212a

19 files changed

+805
-100
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public class ClusterRoutingTable implements RoutingTable
4343
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4444
{
4545
this( clock );
46-
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
46+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>(),
47+
new HashSet<BoltServerAddress>() );
4748
}
4849

4950
private ClusterRoutingTable( Clock clock )
@@ -66,14 +67,16 @@ public boolean isStaleFor( AccessMode mode )
6667
}
6768

6869
@Override
69-
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
70+
public synchronized RoutingTableChange update( ClusterComposition cluster )
7071
{
7172
expirationTimeout = cluster.expirationTimestamp();
73+
// todo: what if server is added as reader and removed as writer? we should not treat it as removed
74+
Set<BoltServerAddress> added = new HashSet<>();
7275
Set<BoltServerAddress> removed = new HashSet<>();
73-
readers.update( cluster.readers(), removed );
74-
writers.update( cluster.writers(), removed );
75-
routers.update( cluster.routers(), removed );
76-
return removed;
76+
readers.update( cluster.readers(), added, removed );
77+
writers.update( cluster.writers(), added, removed );
78+
routers.update( cluster.routers(), added, removed );
79+
return new RoutingTableChange( added, removed );
7780
}
7881

7982
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.Set;
22-
2321
import org.neo4j.driver.internal.RoutingErrorHandler;
2422
import org.neo4j.driver.internal.net.BoltServerAddress;
2523
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -35,6 +33,7 @@
3533
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
3634
{
3735
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
36+
private static final boolean PURGE_ON_ERROR = Boolean.getBoolean( "purgeOnError" );
3837

3938
private final ConnectionPool connections;
4039
private final RoutingTable routingTable;
@@ -113,8 +112,14 @@ private synchronized void forget( BoltServerAddress address )
113112
{
114113
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
115114
routingTable.forget( address );
116-
// drop all current connections to the address
117-
connections.purge( address );
115+
if ( PURGE_ON_ERROR )
116+
{
117+
connections.purge( address );
118+
}
119+
else
120+
{
121+
connections.passivate( address );
122+
}
118123
}
119124

120125
synchronized void ensureRouting( AccessMode mode )
@@ -131,16 +136,35 @@ synchronized void refreshRoutingTable()
131136

132137
// get a new routing table
133138
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
134-
Set<BoltServerAddress> removed = routingTable.update( cluster );
135-
// purge connections to removed addresses
136-
for ( BoltServerAddress address : removed )
137-
{
138-
connections.purge( address );
139-
}
139+
RoutingTableChange routingTableChange = routingTable.update( cluster );
140+
updateConnectionPool( routingTableChange );
140141

141142
log.info( "Refreshed routing information. %s", routingTable );
142143
}
143144

145+
private void updateConnectionPool( RoutingTableChange routingTableChange )
146+
{
147+
if ( PURGE_ON_ERROR )
148+
{
149+
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
150+
{
151+
connections.purge( removedAddress );
152+
}
153+
}
154+
else
155+
{
156+
for ( BoltServerAddress addedAddress : routingTableChange.added() )
157+
{
158+
connections.activate( addedAddress );
159+
}
160+
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
161+
{
162+
connections.passivate( removedAddress );
163+
}
164+
connections.compact();
165+
}
166+
}
167+
144168
private RoundRobinAddressSet addressSetFor( AccessMode mode )
145169
{
146170
switch ( mode )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.Arrays;
22+
import java.util.Collections;
2223
import java.util.Set;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

@@ -55,17 +56,20 @@ int next( int divisor )
5556
return index % divisor;
5657
}
5758

58-
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
59+
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> added,
60+
Set<BoltServerAddress> removed )
5961
{
6062
BoltServerAddress[] prev = this.addresses;
6163
if ( addresses.isEmpty() )
6264
{
6365
this.addresses = NONE;
66+
Collections.addAll( removed, prev );
6467
return;
6568
}
6669
if ( prev.length == 0 )
6770
{
6871
this.addresses = addresses.toArray( NONE );
72+
Collections.addAll( added, this.addresses );
6973
return;
7074
}
7175
BoltServerAddress[] copy = null;
@@ -101,6 +105,7 @@ public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServe
101105
for ( BoltServerAddress address : addresses )
102106
{
103107
copy[j++] = address;
108+
added.add( address );
104109
}
105110
this.addresses = copy;
106111
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.Set;
22-
2321
import org.neo4j.driver.internal.net.BoltServerAddress;
2422
import org.neo4j.driver.v1.AccessMode;
2523

2624
public interface RoutingTable
2725
{
2826
boolean isStaleFor( AccessMode mode );
2927

30-
Set<BoltServerAddress> update( ClusterComposition cluster );
28+
RoutingTableChange update( ClusterComposition cluster );
3129

3230
void forget( BoltServerAddress address );
3331

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.Collections;
22+
import java.util.Set;
23+
24+
import org.neo4j.driver.internal.net.BoltServerAddress;
25+
26+
import static java.util.Collections.unmodifiableSet;
27+
28+
public class RoutingTableChange
29+
{
30+
public static final RoutingTableChange EMPTY = new RoutingTableChange(
31+
Collections.<BoltServerAddress>emptySet(), Collections.<BoltServerAddress>emptySet() );
32+
33+
private final Set<BoltServerAddress> added;
34+
private final Set<BoltServerAddress> removed;
35+
36+
public RoutingTableChange( Set<BoltServerAddress> added, Set<BoltServerAddress> removed )
37+
{
38+
this.added = added;
39+
this.removed = removed;
40+
}
41+
42+
public Set<BoltServerAddress> added()
43+
{
44+
return unmodifiableSet( added );
45+
}
46+
47+
public Set<BoltServerAddress> removed()
48+
{
49+
return unmodifiableSet( removed );
50+
}
51+
52+
@Override
53+
public String toString()
54+
{
55+
return "RoutingTableChange{" +
56+
"added=" + added +
57+
", removed=" + removed +
58+
'}';
59+
}
60+
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/BlockingPooledConnectionQueue.java

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,7 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.ArrayList;
2221
import java.util.Collections;
23-
import java.util.List;
2422
import java.util.Set;
2523
import java.util.concurrent.BlockingQueue;
2624
import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +44,7 @@ public class BlockingPooledConnectionQueue
4644
private final BlockingQueue<PooledConnection> queue;
4745
private final Logger logger;
4846

47+
private final AtomicBoolean isPassive = new AtomicBoolean( false );
4948
private final AtomicBoolean isTerminating = new AtomicBoolean( false );
5049

5150
/** Keeps track of acquired connections */
@@ -69,15 +68,13 @@ public boolean offer( PooledConnection pooledConnection )
6968
acquiredConnections.remove( pooledConnection );
7069
boolean offer = queue.offer( pooledConnection );
7170
// not added back to the queue, dispose of the connection
72-
if (!offer) {
73-
pooledConnection.dispose();
71+
if ( !offer )
72+
{
73+
disposeSafely( pooledConnection );
7474
}
75-
if (isTerminating.get()) {
76-
PooledConnection connection = queue.poll();
77-
if (connection != null)
78-
{
79-
connection.dispose();
80-
}
75+
if ( isPassive.get() || isTerminating.get() )
76+
{
77+
terminateIdleConnections();
8178
}
8279
return offer;
8380
}
@@ -89,20 +86,29 @@ public boolean offer( PooledConnection pooledConnection )
8986
*/
9087
public PooledConnection acquire( Supplier<PooledConnection> supplier )
9188
{
92-
9389
PooledConnection connection = queue.poll();
9490
if ( connection == null )
9591
{
9692
connection = supplier.get();
9793
}
9894
acquiredConnections.add( connection );
9995

100-
if (isTerminating.get()) {
96+
if ( isPassive.get() || isTerminating.get() )
97+
{
10198
acquiredConnections.remove( connection );
102-
connection.dispose();
103-
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
99+
disposeSafely( connection );
100+
throw new IllegalStateException( "Pool is " + (isPassive.get() ? "passivated" : "terminated") + ", " +
101+
"new connections can't be acquired" );
102+
}
103+
else
104+
{
105+
return connection;
104106
}
105-
return connection;
107+
}
108+
109+
public int idleConnections()
110+
{
111+
return queue.size();
106112
}
107113

108114
public int activeConnections()
@@ -116,19 +122,27 @@ void disposeBroken( PooledConnection connection )
116122
disposeSafely( connection );
117123
}
118124

119-
public boolean isEmpty()
125+
public boolean contains( PooledConnection pooledConnection )
120126
{
121-
return queue.isEmpty();
127+
return queue.contains( pooledConnection );
122128
}
123129

124-
public int size()
130+
public void activate()
125131
{
126-
return queue.size();
132+
isPassive.compareAndSet( true, false );
127133
}
128134

129-
public boolean contains( PooledConnection pooledConnection )
135+
public void passivate()
130136
{
131-
return queue.contains( pooledConnection );
137+
if ( isPassive.compareAndSet( false, true ) )
138+
{
139+
terminateIdleConnections();
140+
}
141+
}
142+
143+
public boolean isActive()
144+
{
145+
return !isPassive.get();
132146
}
133147

134148
/**
@@ -141,15 +155,25 @@ public void terminate()
141155
{
142156
if ( isTerminating.compareAndSet( false, true ) )
143157
{
144-
while ( !queue.isEmpty() )
145-
{
146-
PooledConnection idleConnection = queue.poll();
147-
disposeSafely( idleConnection );
148-
}
149-
for ( PooledConnection acquiredConnection : acquiredConnections )
150-
{
151-
disposeSafely( acquiredConnection );
152-
}
158+
terminateIdleConnections();
159+
terminateAcquiredConnections();
160+
}
161+
}
162+
163+
private void terminateIdleConnections()
164+
{
165+
while ( !queue.isEmpty() )
166+
{
167+
PooledConnection idleConnection = queue.poll();
168+
disposeSafely( idleConnection );
169+
}
170+
}
171+
172+
private void terminateAcquiredConnections()
173+
{
174+
for ( PooledConnection acquiredConnection : acquiredConnections )
175+
{
176+
disposeSafely( acquiredConnection );
153177
}
154178
}
155179

0 commit comments

Comments
 (0)