Skip to content

Commit fece718

Browse files
author
Zhen
committed
Terms:
* `ConnectionPools` which is a map from addresses to connection pools * `RoutingTable` which contains addresses for routers, writers, and readers For a routing driver, once an address is removed from the current `RoutingTable`, the address is no longer accessable. a.k.a. no new connection will be created in the corresponding connection pool. When updating routing table, we also need to signal the addresses in `ConnectionPools` to be active if they are newly added into the current routing table or passive if they have already been removed from the routing table. For the pools connected to addresses that have been removed, when a connection is free, the connection should be terminated rather than reused. When there is no connection in the pool, the pool could be safely removed from `ConnectionPools`. So the logic that need to be changed: * When a new `RoutingTable` is available, compute `added_addr = distinct_addr_in(new_routingTable) - distinct_addr_in(pre_routingTable)` `removed_addr = distinct_addr_in(pre_routingTable) - distinct_addr_in(new_routingTable)` * Mark all addresses in set `added_addr` in `ConnectionPools` to be `active` connection pools * Mark all addresses in set `removed_addr` in `ConnectionPools` to be `passive` connection pools * Remove `passive` connection pools if no connection is `inUse` (all connections are idle) * When returning a connection to a `passive` connection pool, terminate the connection directly, [and remove the connection pool if no connections is `InUse`]
1 parent 8ed212a commit fece718

File tree

5 files changed

+51
-169
lines changed

5 files changed

+51
-169
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public class ClusterRoutingTable implements RoutingTable
4343
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4444
{
4545
this( clock );
46-
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>(),
47-
new HashSet<BoltServerAddress>() );
46+
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ));
4847
}
4948

5049
private ClusterRoutingTable( Clock clock )
@@ -66,16 +65,29 @@ public boolean isStaleFor( AccessMode mode )
6665
mode == AccessMode.WRITE && writers.size() == 0;
6766
}
6867

68+
private Set<BoltServerAddress> servers()
69+
{
70+
Set<BoltServerAddress> servers = new HashSet<>();
71+
servers.addAll( readers.servers() );
72+
servers.addAll( writers.servers() );
73+
servers.addAll( routers.servers() );
74+
return servers;
75+
}
76+
6977
@Override
7078
public synchronized RoutingTableChange update( ClusterComposition cluster )
7179
{
7280
expirationTimeout = cluster.expirationTimestamp();
73-
// todo: what if server is added as reader and removed as writer? we should not treat it as removed
74-
Set<BoltServerAddress> added = new HashSet<>();
75-
Set<BoltServerAddress> removed = new HashSet<>();
76-
readers.update( cluster.readers(), added, removed );
77-
writers.update( cluster.writers(), added, removed );
78-
routers.update( cluster.routers(), added, removed );
81+
Set<BoltServerAddress> pre = servers();
82+
readers.update( cluster.readers() );
83+
writers.update( cluster.writers() );
84+
routers.update( cluster.routers() );
85+
Set<BoltServerAddress> cur = servers();
86+
87+
Set<BoltServerAddress> added = new HashSet<>( cur );
88+
Set<BoltServerAddress> removed = new HashSet<>( pre );
89+
added.removeAll( pre );
90+
removed.removeAll( cur );
7991
return new RoutingTableChange( added, removed );
8092
}
8193

driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
package org.neo4j.driver.internal.cluster;
2020

2121
import java.util.Arrays;
22-
import java.util.Collections;
22+
import java.util.HashSet;
2323
import java.util.Set;
2424
import java.util.concurrent.atomic.AtomicInteger;
2525

@@ -46,6 +46,11 @@ public BoltServerAddress next()
4646
return addresses[next( addresses.length )];
4747
}
4848

49+
public Set<BoltServerAddress> servers()
50+
{
51+
return new HashSet<>( Arrays.asList( addresses ) );
52+
}
53+
4954
int next( int divisor )
5055
{
5156
int index = offset.getAndIncrement();
@@ -56,58 +61,9 @@ int next( int divisor )
5661
return index % divisor;
5762
}
5863

59-
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> added,
60-
Set<BoltServerAddress> removed )
64+
public synchronized void update( Set<BoltServerAddress> addresses )
6165
{
62-
BoltServerAddress[] prev = this.addresses;
63-
if ( addresses.isEmpty() )
64-
{
65-
this.addresses = NONE;
66-
Collections.addAll( removed, prev );
67-
return;
68-
}
69-
if ( prev.length == 0 )
70-
{
71-
this.addresses = addresses.toArray( NONE );
72-
Collections.addAll( added, this.addresses );
73-
return;
74-
}
75-
BoltServerAddress[] copy = null;
76-
if ( addresses.size() != prev.length )
77-
{
78-
copy = new BoltServerAddress[addresses.size()];
79-
}
80-
int j = 0;
81-
for ( int i = 0; i < prev.length; i++ )
82-
{
83-
if ( addresses.remove( prev[i] ) )
84-
{
85-
if ( copy != null )
86-
{
87-
copy[j++] = prev[i];
88-
}
89-
}
90-
else
91-
{
92-
removed.add( prev[i] );
93-
if ( copy == null )
94-
{
95-
copy = new BoltServerAddress[prev.length];
96-
System.arraycopy( prev, 0, copy, 0, i );
97-
j = i;
98-
}
99-
}
100-
}
101-
if ( copy == null )
102-
{
103-
return;
104-
}
105-
for ( BoltServerAddress address : addresses )
106-
{
107-
copy[j++] = address;
108-
added.add( address );
109-
}
110-
this.addresses = copy;
66+
this.addresses = addresses.toArray( NONE );
11167
}
11268

11369
public synchronized void remove( BoltServerAddress address )

driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,4 +215,20 @@ public void shouldReturnCorrectChangeWhenUpdated()
215215
assertEquals( 2, change.removed().size() );
216216
assertThat( change.removed(), containsInAnyOrder( A, D ) );
217217
}
218+
219+
@Test
220+
public void shouldNotRemoveServerIfPreWriterNowReader()
221+
{
222+
ClusterRoutingTable routingTable = new ClusterRoutingTable( new FakeClock() );
223+
routingTable.update( createClusterComposition( singletonList( A ), singletonList( B ), singletonList( C ) ) );
224+
225+
ClusterComposition newComposition =
226+
createClusterComposition( singletonList( D ), singletonList( E ), singletonList( B ) );
227+
RoutingTableChange change = routingTable.update( newComposition );
228+
229+
assertEquals( 2, change.added().size() );
230+
assertThat( change.added(), containsInAnyOrder( D, E ) );
231+
assertEquals( 2, change.removed().size() );
232+
assertThat( change.removed(), containsInAnyOrder( A, C ) );
233+
}
218234
}

driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode )
341341
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( RoutingTableChange.EMPTY );
342342

343343
RoundRobinAddressSet addresses = new RoundRobinAddressSet();
344-
addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet<BoltServerAddress>(),
345-
new HashSet<BoltServerAddress>() );
344+
addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ));
346345
when( routingTable.readers() ).thenReturn( addresses );
347346
when( routingTable.writers() ).thenReturn( addresses );
348347

driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java

Lines changed: 6 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@
2121
import org.junit.Test;
2222

2323
import java.util.ArrayList;
24-
import java.util.Collections;
2524
import java.util.HashSet;
2625
import java.util.List;
2726
import java.util.Set;
2827

2928
import org.neo4j.driver.internal.net.BoltServerAddress;
3029

3130
import static java.util.Arrays.asList;
32-
import static java.util.Collections.singleton;
3331
import static org.junit.Assert.assertEquals;
3432
import static org.junit.Assert.assertNotEquals;
3533
import static org.junit.Assert.assertNull;
@@ -57,7 +55,7 @@ public void shouldReturnRoundRobin() throws Exception
5755
new BoltServerAddress( "two" ),
5856
new BoltServerAddress( "tre" ) ) );
5957

60-
set.update( addresses, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
58+
set.update( addresses );
6159

6260
// when
6361
BoltServerAddress a = set.next();
@@ -85,7 +83,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception
8583
new BoltServerAddress( "two" ),
8684
new BoltServerAddress( "tre" ) ) );
8785
RoundRobinAddressSet set = new RoundRobinAddressSet();
88-
set.update( servers, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
86+
set.update( servers );
8987

9088
List<BoltServerAddress> order = new ArrayList<>();
9189
for ( int i = 3 * 4 + 1; i-- > 0; )
@@ -100,7 +98,7 @@ public void shouldPreserveOrderWhenAdding() throws Exception
10098

10199
// when
102100
servers.add( new BoltServerAddress( "fyr" ) );
103-
set.update( servers, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
101+
set.update( servers );
104102

105103
// then
106104
assertEquals( order.get( 1 ), set.next() );
@@ -126,7 +124,7 @@ public void shouldPreserveOrderWhenRemoving() throws Exception
126124
new BoltServerAddress( "two" ),
127125
new BoltServerAddress( "tre" ) ) );
128126
RoundRobinAddressSet set = new RoundRobinAddressSet();
129-
set.update( servers, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
127+
set.update( servers );
130128

131129
List<BoltServerAddress> order = new ArrayList<>();
132130
for ( int i = 3 * 2 + 1; i-- > 0; )
@@ -158,7 +156,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception
158156
new BoltServerAddress( "two" ),
159157
new BoltServerAddress( "tre" ) ) );
160158
RoundRobinAddressSet set = new RoundRobinAddressSet();
161-
set.update( servers, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
159+
set.update( servers );
162160

163161
List<BoltServerAddress> order = new ArrayList<>();
164162
for ( int i = 3 * 2 + 1; i-- > 0; )
@@ -173,7 +171,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception
173171

174172
// when
175173
servers.remove( order.get( 1 ) );
176-
set.update( servers, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
174+
set.update( servers );
177175

178176
// then
179177
assertEquals( order.get( 2 ), set.next() );
@@ -182,106 +180,7 @@ public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception
182180
assertEquals( order.get( 0 ), set.next() );
183181
}
184182

185-
@Test
186-
public void shouldRecordRemovedAddressesWhenUpdating() throws Exception
187-
{
188-
// given
189-
RoundRobinAddressSet set = new RoundRobinAddressSet();
190-
Set<BoltServerAddress> addresses = new HashSet<>( asList(
191-
new BoltServerAddress( "one" ),
192-
new BoltServerAddress( "two" ),
193-
new BoltServerAddress( "tre" ) ) );
194-
set.update( addresses, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
195-
196-
// when
197-
Set<BoltServerAddress> removed = new HashSet<>();
198-
Set<BoltServerAddress> newAddresses = new HashSet<>( asList(
199-
new BoltServerAddress( "one" ),
200-
new BoltServerAddress( "two" ),
201-
new BoltServerAddress( "fyr" ) ) );
202-
set.update( newAddresses, new HashSet<BoltServerAddress>(), removed );
203-
204-
// then
205-
assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed );
206-
}
207183

208-
@Test
209-
public void shouldRecordRemovedAddressesWhenUpdateIsEmpty()
210-
{
211-
RoundRobinAddressSet set = new RoundRobinAddressSet();
212-
Set<BoltServerAddress> addresses = new HashSet<>( asList(
213-
new BoltServerAddress( "one" ),
214-
new BoltServerAddress( "two" ) ) );
215-
set.update( addresses, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
216-
217-
Set<BoltServerAddress> update = Collections.emptySet();
218-
Set<BoltServerAddress> removed = new HashSet<>();
219-
set.update( update, new HashSet<BoltServerAddress>(), removed );
220-
221-
assertEquals( addresses, removed );
222-
}
223-
224-
@Test
225-
public void shouldRecordAddedAddressesWhenUpdatingAnEmptySet()
226-
{
227-
RoundRobinAddressSet set = new RoundRobinAddressSet();
228-
229-
Set<BoltServerAddress> added1 = new HashSet<>();
230-
Set<BoltServerAddress> addresses1 = new HashSet<>( asList(
231-
new BoltServerAddress( "one" ),
232-
new BoltServerAddress( "two" ),
233-
new BoltServerAddress( "tre" ) ) );
234-
set.update( addresses1, added1, new HashSet<BoltServerAddress>() );
235-
236-
assertEquals( addresses1, added1 );
237-
}
238-
239-
@Test
240-
public void shouldRecordAddedAddressesWhenUpdating()
241-
{
242-
RoundRobinAddressSet set = new RoundRobinAddressSet();
243-
244-
Set<BoltServerAddress> addresses1 = new HashSet<>( asList(
245-
new BoltServerAddress( "one" ),
246-
new BoltServerAddress( "two" ),
247-
new BoltServerAddress( "tre" ) ) );
248-
set.update( addresses1, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
249-
250-
Set<BoltServerAddress> added = new HashSet<>();
251-
Set<BoltServerAddress> newAddresses = new HashSet<>( asList(
252-
new BoltServerAddress( "one" ),
253-
new BoltServerAddress( "tre" ),
254-
new BoltServerAddress( "four" ) ) );
255-
set.update( newAddresses, added, new HashSet<BoltServerAddress>() );
256-
257-
assertEquals( singleton( new BoltServerAddress( "four" ) ), added );
258-
}
259-
260-
@Test
261-
public void shouldRecordBothAddedAndRemovedAddressesWhenUpdating()
262-
{
263-
RoundRobinAddressSet set = new RoundRobinAddressSet();
264-
265-
Set<BoltServerAddress> addresses1 = new HashSet<>( asList(
266-
new BoltServerAddress( "one" ),
267-
new BoltServerAddress( "two" ),
268-
new BoltServerAddress( "three" ) ) );
269-
set.update( addresses1, new HashSet<BoltServerAddress>(), new HashSet<BoltServerAddress>() );
270-
271-
Set<BoltServerAddress> newAddresses = new HashSet<>( asList(
272-
new BoltServerAddress( "two" ),
273-
new BoltServerAddress( "four" ),
274-
new BoltServerAddress( "five" ) ) );
275-
276-
Set<BoltServerAddress> added = new HashSet<>();
277-
Set<BoltServerAddress> removed = new HashSet<>();
278-
set.update( newAddresses, added, removed );
279-
280-
assertEquals(
281-
new HashSet<>( asList( new BoltServerAddress( "four" ), new BoltServerAddress( "five" ) ) ), added );
282-
assertEquals(
283-
new HashSet<>( asList( new BoltServerAddress( "one" ), new BoltServerAddress( "three" ) ) ), removed );
284-
}
285184

286185
@Test
287186
public void shouldPreserveOrderEvenWhenIntegerOverflows() throws Exception

0 commit comments

Comments
 (0)