Skip to content

Commit 8cc7b03

Browse files
authored
Merge pull request #424 from lutovich/1.4-no-purge
Relax connection termination policy in routing driver
2 parents 0c60f63 + 1839e55 commit 8cc7b03

19 files changed

+773
-176
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class ClusterRoutingTable implements RoutingTable
4343
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4444
{
4545
this( clock );
46-
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
46+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ));
4747
}
4848

4949
private ClusterRoutingTable( Clock clock )
@@ -65,15 +65,31 @@ public boolean isStaleFor( AccessMode mode )
6565
mode == AccessMode.WRITE && writers.size() == 0;
6666
}
6767

68+
private Set<BoltServerAddress> servers()
69+
{
70+
Set<BoltServerAddress> servers = new HashSet<>();
71+
servers.addAll( readers.servers() );
72+
servers.addAll( writers.servers() );
73+
servers.addAll( routers.servers() );
74+
return servers;
75+
}
76+
6877
@Override
69-
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
78+
public synchronized RoutingTableChange update( ClusterComposition cluster )
7079
{
7180
expirationTimeout = cluster.expirationTimestamp();
72-
Set<BoltServerAddress> removed = new HashSet<>();
73-
readers.update( cluster.readers(), removed );
74-
writers.update( cluster.writers(), removed );
75-
routers.update( cluster.routers(), removed );
76-
return removed;
81+
Set<BoltServerAddress> previousServers = servers();
82+
83+
readers.update( cluster.readers() );
84+
writers.update( cluster.writers() );
85+
routers.update( cluster.routers() );
86+
Set<BoltServerAddress> currentServers = servers();
87+
88+
Set<BoltServerAddress> added = new HashSet<>( currentServers );
89+
Set<BoltServerAddress> removed = new HashSet<>( previousServers );
90+
added.removeAll( previousServers );
91+
removed.removeAll( currentServers );
92+
return new RoutingTableChange( added, removed );
7793
}
7894

7995
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.Set;
22-
2321
import org.neo4j.driver.internal.RoutingErrorHandler;
2422
import org.neo4j.driver.internal.net.BoltServerAddress;
2523
import org.neo4j.driver.internal.spi.ConnectionPool;
@@ -35,6 +33,7 @@
3533
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
3634
{
3735
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
36+
private static final boolean PURGE_ON_ERROR = Boolean.getBoolean( "purgeOnError" );
3837

3938
private final ConnectionPool connections;
4039
private final RoutingTable routingTable;
@@ -113,8 +112,14 @@ private synchronized void forget( BoltServerAddress address )
113112
{
114113
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
115114
routingTable.forget( address );
116-
// drop all current connections to the address
117-
connections.purge( address );
115+
if ( PURGE_ON_ERROR )
116+
{
117+
connections.purge( address );
118+
}
119+
else
120+
{
121+
connections.deactivate( address );
122+
}
118123
}
119124

120125
synchronized void ensureRouting( AccessMode mode )
@@ -131,16 +136,35 @@ synchronized void refreshRoutingTable()
131136

132137
// get a new routing table
133138
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
134-
Set<BoltServerAddress> removed = routingTable.update( cluster );
135-
// purge connections to removed addresses
136-
for ( BoltServerAddress address : removed )
137-
{
138-
connections.purge( address );
139-
}
139+
RoutingTableChange routingTableChange = routingTable.update( cluster );
140+
updateConnectionPool( routingTableChange );
140141

141142
log.info( "Refreshed routing information. %s", routingTable );
142143
}
143144

145+
private void updateConnectionPool( RoutingTableChange routingTableChange )
146+
{
147+
if ( PURGE_ON_ERROR )
148+
{
149+
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
150+
{
151+
connections.purge( removedAddress );
152+
}
153+
}
154+
else
155+
{
156+
for ( BoltServerAddress addedAddress : routingTableChange.added() )
157+
{
158+
connections.activate( addedAddress );
159+
}
160+
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
161+
{
162+
connections.deactivate( removedAddress );
163+
}
164+
connections.compact();
165+
}
166+
}
167+
144168
private RoundRobinAddressSet addressSetFor( AccessMode mode )
145169
{
146170
switch ( mode )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.Arrays;
22+
import java.util.HashSet;
2223
import java.util.Set;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425

@@ -45,6 +46,11 @@ public BoltServerAddress next()
4546
return addresses[next( addresses.length )];
4647
}
4748

49+
public Set<BoltServerAddress> servers()
50+
{
51+
return new HashSet<>( Arrays.asList( addresses ) );
52+
}
53+
4854
int next( int divisor )
4955
{
5056
int index = offset.getAndIncrement();
@@ -55,54 +61,9 @@ int next( int divisor )
5561
return index % divisor;
5662
}
5763

58-
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
64+
public synchronized void update( Set<BoltServerAddress> addresses )
5965
{
60-
BoltServerAddress[] prev = this.addresses;
61-
if ( addresses.isEmpty() )
62-
{
63-
this.addresses = NONE;
64-
return;
65-
}
66-
if ( prev.length == 0 )
67-
{
68-
this.addresses = addresses.toArray( NONE );
69-
return;
70-
}
71-
BoltServerAddress[] copy = null;
72-
if ( addresses.size() != prev.length )
73-
{
74-
copy = new BoltServerAddress[addresses.size()];
75-
}
76-
int j = 0;
77-
for ( int i = 0; i < prev.length; i++ )
78-
{
79-
if ( addresses.remove( prev[i] ) )
80-
{
81-
if ( copy != null )
82-
{
83-
copy[j++] = prev[i];
84-
}
85-
}
86-
else
87-
{
88-
removed.add( prev[i] );
89-
if ( copy == null )
90-
{
91-
copy = new BoltServerAddress[prev.length];
92-
System.arraycopy( prev, 0, copy, 0, i );
93-
j = i;
94-
}
95-
}
96-
}
97-
if ( copy == null )
98-
{
99-
return;
100-
}
101-
for ( BoltServerAddress address : addresses )
102-
{
103-
copy[j++] = address;
104-
}
105-
this.addresses = copy;
66+
this.addresses = addresses.toArray( NONE );
10667
}
10768

10869
public synchronized void remove( BoltServerAddress address )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.Set;
22-
2321
import org.neo4j.driver.internal.net.BoltServerAddress;
2422
import org.neo4j.driver.v1.AccessMode;
2523

2624
public interface RoutingTable
2725
{
2826
boolean isStaleFor( AccessMode mode );
2927

30-
Set<BoltServerAddress> update( ClusterComposition cluster );
28+
RoutingTableChange update( ClusterComposition cluster );
3129

3230
void forget( BoltServerAddress address );
3331

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.Collections;
22+
import java.util.Set;
23+
24+
import org.neo4j.driver.internal.net.BoltServerAddress;
25+
26+
import static java.util.Collections.unmodifiableSet;
27+
28+
public class RoutingTableChange
29+
{
30+
public static final RoutingTableChange EMPTY = new RoutingTableChange(
31+
Collections.<BoltServerAddress>emptySet(), Collections.<BoltServerAddress>emptySet() );
32+
33+
private final Set<BoltServerAddress> added;
34+
private final Set<BoltServerAddress> removed;
35+
36+
public RoutingTableChange( Set<BoltServerAddress> added, Set<BoltServerAddress> removed )
37+
{
38+
this.added = added;
39+
this.removed = removed;
40+
}
41+
42+
public Set<BoltServerAddress> added()
43+
{
44+
return unmodifiableSet( added );
45+
}
46+
47+
public Set<BoltServerAddress> removed()
48+
{
49+
return unmodifiableSet( removed );
50+
}
51+
52+
@Override
53+
public String toString()
54+
{
55+
return "RoutingTableChange{" +
56+
"added=" + added +
57+
", removed=" + removed +
58+
'}';
59+
}
60+
}

0 commit comments

Comments
 (0)