From ab54cc5b4efe53f737b76eda6f81b6d3ed9eb32b Mon Sep 17 00:00:00 2001 From: lutovich Date: Tue, 4 Jul 2017 12:28:26 +0200 Subject: [PATCH] Decouple load balancing logic from address set Round-robin load balancing was previously coupled with the address set implementation and made it hard to use different load balancing algorithm. This commit turns RoundRobinAddressSet into a simple concurrent set of addresses and moves load balancing logic into a separate component that takes list of available addresses and returns one that should be used next. This allows easier implementation of new load balancing algorithms. Rediscovery procedure will now not use load balancing and query routers in a known order (which is randomized by the database). --- ...ndRobinAddressSet.java => AddressSet.java} | 36 +-- .../internal/cluster/ClusterRoutingTable.java | 26 +- .../driver/internal/cluster/LoadBalancer.java | 32 ++- .../cluster/LoadBalancingStrategy.java | 28 +++ .../driver/internal/cluster/Rediscovery.java | 11 +- .../cluster/RoundRobinArrayIndex.java | 53 ++++ .../RoundRobinLoadBalancingStrategy.java | 50 ++++ .../driver/internal/cluster/RoutingTable.java | 8 +- .../internal/RoutingDriverBoltKitTest.java | 10 +- .../internal/cluster/AddressSetTest.java | 171 +++++++++++++ .../cluster/ClusterRoutingTableTest.java | 24 +- .../internal/cluster/LoadBalancerTest.java | 22 +- .../internal/cluster/RediscoveryTest.java | 48 ++++ .../cluster/RoundRobinAddressSetTest.java | 226 ------------------ .../cluster/RoundRobinArrayIndexTest.java | 67 ++++++ .../RoundRobinLoadBalancingStrategyTest.java | 98 ++++++++ .../neo4j/driver/internal/util/Matchers.java | 15 +- 17 files changed, 595 insertions(+), 330 deletions(-) rename driver/src/main/java/org/neo4j/driver/internal/cluster/{RoundRobinAddressSet.java => AddressSet.java} (78%) create mode 100644 driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java create mode 100644 driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java delete mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java create mode 100644 driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java similarity index 78% rename from driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java rename to driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java index b933f6da5c..2bd03b5d31 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java @@ -20,39 +20,23 @@ import java.util.Arrays; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; import org.neo4j.driver.internal.net.BoltServerAddress; -public class RoundRobinAddressSet +public class AddressSet { private static final BoltServerAddress[] NONE = {}; - private final AtomicInteger offset = new AtomicInteger(); - private volatile BoltServerAddress[] addresses = NONE; - public int size() - { - return addresses.length; - } + private volatile BoltServerAddress[] addresses = NONE; - public BoltServerAddress next() + public BoltServerAddress[] toArray() { - BoltServerAddress[] addresses = this.addresses; - if ( addresses.length == 0 ) - { - return null; - } - return addresses[next( addresses.length )]; + return addresses; } - int next( int divisor ) + public int size() { - int index = offset.getAndIncrement(); - for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() ) - { - offset.compareAndSet( Integer.MIN_VALUE, index % divisor ); - } - return index % divisor; + return addresses.length; } public synchronized void update( Set addresses, Set removed ) @@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address ) @Override public String toString() { - return "RoundRobinAddressSet=" + Arrays.toString( addresses ); - } - - /** breaking encapsulation in order to perform white-box testing of boundary case */ - void setOffset( int target ) - { - offset.set( target ); + return "AddressSet=" + Arrays.toString( addresses ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java index 54a80d60d7..36812ad945 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java @@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable private final Clock clock; private volatile long expirationTimeout; - private final RoundRobinAddressSet readers; - private final RoundRobinAddressSet writers; - private final RoundRobinAddressSet routers; + private final AddressSet readers; + private final AddressSet writers; + private final AddressSet routers; public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses ) { @@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock ) this.clock = clock; this.expirationTimeout = clock.millis() - 1; - this.readers = new RoundRobinAddressSet(); - this.writers = new RoundRobinAddressSet(); - this.routers = new RoundRobinAddressSet(); + this.readers = new AddressSet(); + this.writers = new AddressSet(); + this.routers = new AddressSet(); } @Override @@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address ) } @Override - public RoundRobinAddressSet readers() + public AddressSet readers() { return readers; } @Override - public RoundRobinAddressSet writers() + public AddressSet writers() { return writers; } @Override - public BoltServerAddress nextRouter() + public AddressSet routers() { - return routers.next(); - } - - @Override - public int routerSize() - { - return routers.size(); + return routers; } @Override diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java index ef48041d29..b46fba3111 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancer.java @@ -39,6 +39,7 @@ public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, Au private final ConnectionPool connections; private final RoutingTable routingTable; private final Rediscovery rediscovery; + private final LoadBalancingStrategy loadBalancingStrategy; private final Logger log; public LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, ConnectionPool connections, @@ -59,6 +60,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, this.connections = connections; this.routingTable = routingTable; this.rediscovery = rediscovery; + this.loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); this.log = log; refreshRoutingTable(); @@ -67,7 +69,7 @@ private LoadBalancer( BoltServerAddress initialRouter, RoutingSettings settings, @Override public PooledConnection acquireConnection( AccessMode mode ) { - RoundRobinAddressSet addressSet = addressSetFor( mode ); + AddressSet addressSet = addressSetFor( mode ); PooledConnection connection = acquireConnection( mode, addressSet ); return new RoutingPooledConnection( connection, this, mode ); } @@ -90,10 +92,10 @@ public void close() throws Exception connections.close(); } - private PooledConnection acquireConnection( AccessMode mode, RoundRobinAddressSet servers ) + private PooledConnection acquireConnection( AccessMode mode, AddressSet servers ) { ensureRouting( mode ); - for ( BoltServerAddress address; (address = servers.next()) != null; ) + for ( BoltServerAddress address; (address = selectAddress( mode, servers )) != null; ) { try { @@ -141,7 +143,7 @@ synchronized void refreshRoutingTable() log.info( "Refreshed routing information. %s", routingTable ); } - private RoundRobinAddressSet addressSetFor( AccessMode mode ) + private AddressSet addressSetFor( AccessMode mode ) { switch ( mode ) { @@ -150,7 +152,22 @@ private RoundRobinAddressSet addressSetFor( AccessMode mode ) case WRITE: return routingTable.writers(); default: - throw new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + throw unknownMode( mode ); + } + } + + private BoltServerAddress selectAddress( AccessMode mode, AddressSet servers ) + { + BoltServerAddress[] addresses = servers.toArray(); + + switch ( mode ) + { + case READ: + return loadBalancingStrategy.selectReader( addresses ); + case WRITE: + return loadBalancingStrategy.selectWriter( addresses ); + default: + throw unknownMode( mode ); } } @@ -161,4 +178,9 @@ private static Rediscovery createRediscovery( BoltServerAddress initialRouter, R new RoutingProcedureClusterCompositionProvider( clock, log, settings ); return new Rediscovery( initialRouter, settings, clock, log, clusterComposition, new DnsResolver( log ) ); } + + private static RuntimeException unknownMode( AccessMode mode ) + { + return new IllegalArgumentException( "Mode '" + mode + "' is not supported" ); + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java new file mode 100644 index 0000000000..99f40fcd42 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/LoadBalancingStrategy.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +public interface LoadBalancingStrategy +{ + BoltServerAddress selectReader( BoltServerAddress[] knownReaders ); + + BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ); +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java index a038da4dfc..de4db16771 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java @@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections, Set seenServers ) { - int size = routingTable.routerSize(); - for ( int i = 0; i < size; i++ ) - { - BoltServerAddress address = routingTable.nextRouter(); - if ( address == null ) - { - break; - } + BoltServerAddress[] addresses = routingTable.routers().toArray(); + for ( BoltServerAddress address : addresses ) + { ClusterComposition composition = lookupOnRouter( address, routingTable, connections ); if ( composition != null ) { diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java new file mode 100644 index 0000000000..cc1002a875 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndex.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.concurrent.atomic.AtomicInteger; + +public class RoundRobinArrayIndex +{ + private final AtomicInteger offset; + + RoundRobinArrayIndex() + { + this( 0 ); + } + + // only for testing + RoundRobinArrayIndex( int initialOffset ) + { + this.offset = new AtomicInteger( initialOffset ); + } + + public int next( int arrayLength ) + { + if ( arrayLength == 0 ) + { + return -1; + } + + int nextOffset; + while ( (nextOffset = offset.getAndIncrement()) < 0 ) + { + // overflow, try resetting back to zero + offset.compareAndSet( nextOffset + 1, 0 ); + } + return nextOffset % arrayLength; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java new file mode 100644 index 0000000000..35cb127b85 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategy.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +public class RoundRobinLoadBalancingStrategy implements LoadBalancingStrategy +{ + private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex(); + private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex(); + + @Override + public BoltServerAddress selectReader( BoltServerAddress[] knownReaders ) + { + return select( knownReaders, readersIndex ); + } + + @Override + public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters ) + { + return select( knownWriters, writersIndex ); + } + + private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex roundRobinIndex ) + { + int length = addresses.length; + if ( length == 0 ) + { + return null; + } + int index = roundRobinIndex.next( length ); + return addresses[index]; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java index 9ec32dddb0..162ab7a51c 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java @@ -31,13 +31,11 @@ public interface RoutingTable void forget( BoltServerAddress address ); - RoundRobinAddressSet readers(); + AddressSet readers(); - RoundRobinAddressSet writers(); + AddressSet writers(); - BoltServerAddress nextRouter(); - - int routerSize(); + AddressSet routers(); void removeWriter( BoltServerAddress toRemove ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java index 8c6ea1f5a3..920b53a71c 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/RoutingDriverBoltKitTest.java @@ -808,11 +808,11 @@ public void shouldRetryReadTransactionAndPerformRediscoveryUntilSuccess() throws @Test public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throws Exception { - StubServer router1 = StubServer.start( "acquire_endpoints.script", 9010 ); - StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9007 ); + StubServer router1 = StubServer.start( "discover_servers.script", 9010 ); + StubServer brokenWriter1 = StubServer.start( "dead_write_server.script", 9001 ); + StubServer router2 = StubServer.start( "acquire_endpoints.script", 9002 ); StubServer brokenWriter2 = StubServer.start( "dead_write_server.script", 9008 ); - StubServer router2 = StubServer.start( "discover_servers.script", 9002 ); - StubServer writer = StubServer.start( "write_server.script", 9001 ); + StubServer writer = StubServer.start( "write_server.script", 9007 ); try ( Driver driver = newDriverWithSleeplessClock( "bolt+routing://127.0.0.1:9010" ); Session session = driver.session() ) @@ -827,9 +827,9 @@ public void shouldRetryWriteTransactionAndPerformRediscoveryUntilSuccess() throw { assertEquals( 0, router1.exitStatus() ); assertEquals( 0, brokenWriter1.exitStatus() ); - assertEquals( 0, brokenWriter2.exitStatus() ); assertEquals( 0, router2.exitStatus() ); assertEquals( 0, writer.exitStatus() ); + assertEquals( 0, brokenWriter2.exitStatus() ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java new file mode 100644 index 0000000000..b8af2751a5 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/AddressSetTest.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +import static java.util.Collections.singleton; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class AddressSetTest +{ + @Test + public void shouldPreserveOrderWhenAdding() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + servers.add( new BoltServerAddress( "fyr" ) ); + set.update( servers, new HashSet() ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" ), + new BoltServerAddress( "fyr" )}, set.toArray() ); + } + + @Test + public void shouldPreserveOrderWhenRemoving() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + set.remove( new BoltServerAddress( "one" ) ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + } + + @Test + public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception + { + // given + Set servers = addresses( "one", "two", "tre" ); + AddressSet set = new AddressSet(); + set.update( servers, new HashSet() ); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + + // when + servers.remove( new BoltServerAddress( "one" ) ); + set.update( servers, new HashSet() ); + + // then + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, set.toArray() ); + } + + @Test + public void shouldRecordRemovedAddressesWhenUpdating() throws Exception + { + // given + AddressSet set = new AddressSet(); + set.update( addresses( "one", "two", "tre" ), new HashSet() ); + + // when + HashSet removed = new HashSet<>(); + set.update( addresses( "one", "two", "fyr" ), removed ); + + // then + assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); + } + + @Test + public void shouldExposeEmptyArrayWhenEmpty() + { + AddressSet addressSet = new AddressSet(); + + BoltServerAddress[] addresses = addressSet.toArray(); + + assertEquals( 0, addresses.length ); + } + + @Test + public void shouldExposeCorrectArray() + { + AddressSet addressSet = new AddressSet(); + addressSet.update( addresses( "one", "two", "tre" ), new HashSet() ); + + BoltServerAddress[] addresses = addressSet.toArray(); + + assertArrayEquals( new BoltServerAddress[]{ + new BoltServerAddress( "one" ), + new BoltServerAddress( "two" ), + new BoltServerAddress( "tre" )}, addresses ); + } + + @Test + public void shouldHaveSizeZeroWhenEmpty() + { + AddressSet addressSet = new AddressSet(); + + assertEquals( 0, addressSet.size() ); + } + + @Test + public void shouldHaveCorrectSize() + { + AddressSet addressSet = new AddressSet(); + addressSet.update( addresses( "one", "two" ), new HashSet() ); + + assertEquals( 2, addressSet.size() ); + } + + private static Set addresses( String... strings ) + { + Set set = new LinkedHashSet<>(); + for ( String string : strings ) + { + set.add( new BoltServerAddress( string ) ); + } + return set; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java index 0a12c079cf..2766990516 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/ClusterRoutingTableTest.java @@ -27,7 +27,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.neo4j.driver.internal.cluster.ClusterCompositionUtil.A; @@ -142,13 +142,7 @@ public void shouldPreserveOrderingOfRouters() routingTable.update( createClusterComposition( routers, EMPTY, EMPTY ) ); - assertEquals( A, routingTable.nextRouter() ); - assertEquals( C, routingTable.nextRouter() ); - assertEquals( D, routingTable.nextRouter() ); - assertEquals( F, routingTable.nextRouter() ); - assertEquals( B, routingTable.nextRouter() ); - assertEquals( E, routingTable.nextRouter() ); - assertEquals( A, routingTable.nextRouter() ); + assertArrayEquals( new BoltServerAddress[]{A, C, D, F, B, E}, routingTable.routers().toArray() ); } @Test @@ -159,12 +153,7 @@ public void shouldPreserveOrderingOfWriters() routingTable.update( createClusterComposition( EMPTY, writers, EMPTY ) ); - assertEquals( D, routingTable.writers().next() ); - assertEquals( F, routingTable.writers().next() ); - assertEquals( A, routingTable.writers().next() ); - assertEquals( C, routingTable.writers().next() ); - assertEquals( E, routingTable.writers().next() ); - assertEquals( D, routingTable.writers().next() ); + assertArrayEquals( new BoltServerAddress[]{D, F, A, C, E}, routingTable.writers().toArray() ); } @Test @@ -175,12 +164,7 @@ public void shouldPreserveOrderingOfReaders() routingTable.update( createClusterComposition( EMPTY, EMPTY, readers ) ); - assertEquals( B, routingTable.readers().next() ); - assertEquals( A, routingTable.readers().next() ); - assertEquals( F, routingTable.readers().next() ); - assertEquals( C, routingTable.readers().next() ); - assertEquals( D, routingTable.readers().next() ); - assertEquals( B, routingTable.readers().next() ); + assertArrayEquals( new BoltServerAddress[]{B, A, F, C, D}, routingTable.readers().toArray() ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java index 873781b9a9..b787b2356a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/LoadBalancerTest.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -176,9 +175,12 @@ public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosing @Test public void shouldForgetAddressAndItsConnectionsOnServiceUnavailableWhileClosingSession() { - RoutingTable routingTable = mock( RoutingTable.class, RETURNS_MOCKS ); - ConnectionPool connectionPool = mock( ConnectionPool.class ); BoltServerAddress address = new BoltServerAddress( "host", 42 ); + RoutingTable routingTable = mock( RoutingTable.class ); + AddressSet addressSet = mock( AddressSet.class ); + when( addressSet.toArray() ).thenReturn( new BoltServerAddress[]{address} ); + when( routingTable.writers() ).thenReturn( addressSet ); + ConnectionPool connectionPool = mock( ConnectionPool.class ); PooledConnection connectionWithFailingSync = newConnectionWithFailingSync( address ); when( connectionPool.acquire( any( BoltServerAddress.class ) ) ).thenReturn( connectionWithFailingSync ); Rediscovery rediscovery = mock( Rediscovery.class ); @@ -225,8 +227,8 @@ public void shouldThrowWhenRediscoveryReturnsNoSuitableServers() RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.isStaleFor( any( AccessMode.class ) ) ).thenReturn( true ); Rediscovery rediscovery = mock( Rediscovery.class ); - when( routingTable.readers() ).thenReturn( new RoundRobinAddressSet() ); - when( routingTable.writers() ).thenReturn( new RoundRobinAddressSet() ); + when( routingTable.readers() ).thenReturn( new AddressSet() ); + when( routingTable.writers() ).thenReturn( new AddressSet() ); LoadBalancer loadBalancer = new LoadBalancer( connections, routingTable, rediscovery, DEV_NULL_LOGGER ); @@ -300,11 +302,11 @@ private LoadBalancer setupLoadBalancer( PooledConnection writerConn, PooledConne when( connPool.acquire( writer ) ).thenReturn( writerConn ); when( connPool.acquire( reader ) ).thenReturn( readConn ); - RoundRobinAddressSet writerAddrs = mock( RoundRobinAddressSet.class ); - when( writerAddrs.next() ).thenReturn( writer ); + AddressSet writerAddrs = mock( AddressSet.class ); + when( writerAddrs.toArray() ).thenReturn( new BoltServerAddress[]{writer} ); - RoundRobinAddressSet readerAddrs = mock( RoundRobinAddressSet.class ); - when( readerAddrs.next() ).thenReturn( reader ); + AddressSet readerAddrs = mock( AddressSet.class ); + when( readerAddrs.toArray() ).thenReturn( new BoltServerAddress[]{reader} ); RoutingTable routingTable = mock( RoutingTable.class ); when( routingTable.readers() ).thenReturn( readerAddrs ); @@ -336,7 +338,7 @@ private static RoutingTable newStaleRoutingTableMock( AccessMode mode ) when( routingTable.isStaleFor( mode ) ).thenReturn( true ); when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( new HashSet() ); - RoundRobinAddressSet addresses = new RoundRobinAddressSet(); + AddressSet addresses = new AddressSet(); addresses.update( new HashSet<>( singletonList( LOCAL_DEFAULT ) ), new HashSet() ); when( routingTable.readers() ).thenReturn( addresses ); when( routingTable.writers() ).thenReturn( addresses ); diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java index b204335145..33966304db 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RediscoveryTest.java @@ -23,6 +23,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import org.mockito.InOrder; import java.util.ArrayList; import java.util.Collection; @@ -49,6 +50,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -451,6 +453,52 @@ public void shouldNotUseInitialRouterTwiceIfRoutingTableContainsIt() } } + public static class KnownRoutersTest + { + @Test + public void shouldProbeAllKnownRoutersInOrder() + { + PooledConnection brokenConnection1 = mock( PooledConnection.class ); + PooledConnection goodConnection = mock( PooledConnection.class ); + PooledConnection brokenConnection2 = mock( PooledConnection.class ); + + ConnectionPool connections = mock( ConnectionPool.class ); + when( connections.acquire( A ) ).thenReturn( brokenConnection1 ); + when( connections.acquire( B ) ).thenReturn( goodConnection ); + when( connections.acquire( C ) ).thenReturn( brokenConnection2 ); + + ClusterCompositionProvider clusterComposition = mock( ClusterCompositionProvider.class ); + when( clusterComposition.getClusterComposition( brokenConnection1 ) ) + .thenThrow( new ServiceUnavailableException( "Can't connect" ) ); + when( clusterComposition.getClusterComposition( goodConnection ) ) + .thenReturn( success( VALID_CLUSTER_COMPOSITION ) ); + when( clusterComposition.getClusterComposition( brokenConnection2 ) ) + .thenThrow( new ServiceUnavailableException( "Can't connect" ) ); + + RoutingTable routingTable = new TestRoutingTable( A, B, C ); + + RoutingSettings settings = new RoutingSettings( 1, 0, null ); + Clock mockedClock = mock( Clock.class ); + Logger mockedLogger = mock( Logger.class ); + + Rediscovery rediscovery = new Rediscovery( A, settings, mockedClock, mockedLogger, clusterComposition, + directMapProvider ); + + ClusterComposition composition1 = rediscovery.lookupClusterComposition( routingTable, connections ); + assertEquals( VALID_CLUSTER_COMPOSITION, composition1 ); + + ClusterComposition composition2 = rediscovery.lookupClusterComposition( routingTable, connections ); + assertEquals( VALID_CLUSTER_COMPOSITION, composition2 ); + + // server A should've been removed after an unsuccessful attempt + InOrder inOrder = inOrder( clusterComposition ); + inOrder.verify( clusterComposition ).getClusterComposition( brokenConnection1 ); + inOrder.verify( clusterComposition, times( 2 ) ).getClusterComposition( goodConnection ); + + verify( clusterComposition, never() ).getClusterComposition( brokenConnection2 ); + } + } + private static ClusterComposition rediscover( ConnectionPool connections, RoutingTable routingTable, ClusterCompositionProvider provider ) { diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java deleted file mode 100644 index 390ae234ec..0000000000 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSetTest.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright (c) 2002-2017 "Neo Technology," - * Network Engine for Objects in Lund AB [http://neotechnology.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.neo4j.driver.internal.cluster; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -import org.junit.Test; - -import org.neo4j.driver.internal.net.BoltServerAddress; - -import static java.util.Arrays.asList; -import static java.util.Collections.singleton; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -public class RoundRobinAddressSetTest -{ - @Test - public void shouldReturnNullWhenEmpty() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - // then - assertNull( set.next() ); - } - - @Test - public void shouldReturnRoundRobin() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ), new HashSet() ); - - // when - BoltServerAddress a = set.next(); - BoltServerAddress b = set.next(); - BoltServerAddress c = set.next(); - - // then - assertEquals( a, set.next() ); - assertEquals( b, set.next() ); - assertEquals( c, set.next() ); - assertEquals( a, set.next() ); - assertEquals( b, set.next() ); - assertEquals( c, set.next() ); - assertNotEquals( a, c ); - assertNotEquals( b, a ); - assertNotEquals( c, b ); - } - - @Test - public void shouldPreserveOrderWhenAdding() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 4 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - servers.add( new BoltServerAddress( "fyr" ) ); - set.update( servers, new HashSet() ); - - // then - assertEquals( order.get( 1 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - BoltServerAddress next = set.next(); - assertNotEquals( order.get( 0 ), next ); - assertNotEquals( order.get( 1 ), next ); - assertNotEquals( order.get( 2 ), next ); - assertEquals( order.get( 0 ), set.next() ); - // ... and once more - assertEquals( order.get( 1 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( next, set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldPreserveOrderWhenRemoving() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 2 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - set.remove( order.get( 1 ) ); - - // then - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldPreserveOrderWhenRemovingThroughUpdate() throws Exception - { - // given - HashSet servers = new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ); - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( servers, new HashSet() ); - - List order = new ArrayList<>(); - for ( int i = 3 * 2 + 1; i-- > 0; ) - { - BoltServerAddress server = set.next(); - if ( !order.contains( server ) ) - { - order.add( server ); - } - } - assertEquals( 3, order.size() ); - - // when - servers.remove( order.get( 1 ) ); - set.update( servers, new HashSet() ); - - // then - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - assertEquals( order.get( 2 ), set.next() ); - assertEquals( order.get( 0 ), set.next() ); - } - - @Test - public void shouldRecordRemovedAddressesWhenUpdating() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - set.update( - new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "tre" ) ) ), - new HashSet() ); - - // when - HashSet removed = new HashSet<>(); - set.update( - new HashSet<>( asList( - new BoltServerAddress( "one" ), - new BoltServerAddress( "two" ), - new BoltServerAddress( "fyr" ) ) ), - removed ); - - // then - assertEquals( singleton( new BoltServerAddress( "tre" ) ), removed ); - } - - @Test - public void shouldPreserveOrderEvenWhenIntegerOverflows() throws Exception - { - // given - RoundRobinAddressSet set = new RoundRobinAddressSet(); - - for ( int div = 1; div <= 1024; div++ ) - { - // when - white box testing! - set.setOffset( Integer.MAX_VALUE - 1 ); - int a = set.next( div ); - int b = set.next( div ); - - // then - if ( b != (a + 1) % div ) - { - fail( String.format( "a=%d, b=%d, div=%d, (a+1)%%div=%d", a, b, div, (a + 1) % div ) ); - } - } - } -} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java new file mode 100644 index 0000000000..a91b678ca4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinArrayIndexTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RoundRobinArrayIndexTest +{ + @Test + public void shouldHandleZeroLength() + { + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex(); + + int index = roundRobinIndex.next( 0 ); + + assertEquals( -1, index ); + } + + @Test + public void shouldReturnIndexesInRoundRobinOrder() + { + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex(); + + for ( int i = 0; i < 10; i++ ) + { + int index = roundRobinIndex.next( 10 ); + assertEquals( i, index ); + } + + for ( int i = 0; i < 5; i++ ) + { + int index = roundRobinIndex.next( 5 ); + assertEquals( i, index ); + } + } + + @Test + public void shouldHandleOverflow() + { + int arrayLength = 10; + RoundRobinArrayIndex roundRobinIndex = new RoundRobinArrayIndex( Integer.MAX_VALUE - 1 ); + + assertEquals( (Integer.MAX_VALUE - 1) % arrayLength, roundRobinIndex.next( arrayLength ) ); + assertEquals( Integer.MAX_VALUE % arrayLength, roundRobinIndex.next( arrayLength ) ); + assertEquals( 0, roundRobinIndex.next( arrayLength ) ); + assertEquals( 1, roundRobinIndex.next( arrayLength ) ); + assertEquals( 2, roundRobinIndex.next( arrayLength ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java new file mode 100644 index 0000000000..63ef0ad8e4 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoundRobinLoadBalancingStrategyTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2002-2017 "Neo Technology," + * Network Engine for Objects in Lund AB [http://neotechnology.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.Test; + +import org.neo4j.driver.internal.net.BoltServerAddress; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class RoundRobinLoadBalancingStrategyTest +{ + private final RoundRobinLoadBalancingStrategy strategy = new RoundRobinLoadBalancingStrategy(); + + @Test + public void shouldHandleEmptyReadersArray() + { + assertNull( strategy.selectReader( new BoltServerAddress[0] ) ); + } + + @Test + public void shouldHandleEmptyWritersArray() + { + assertNull( strategy.selectWriter( new BoltServerAddress[0] ) ); + } + + @Test + public void shouldHandleSingleReader() + { + BoltServerAddress address = new BoltServerAddress( "reader", 9999 ); + + assertEquals( address, strategy.selectReader( new BoltServerAddress[]{address} ) ); + } + + @Test + public void shouldHandleSingleWriter() + { + BoltServerAddress address = new BoltServerAddress( "writer", 9999 ); + + assertEquals( address, strategy.selectWriter( new BoltServerAddress[]{address} ) ); + } + + @Test + public void shouldReturnReadersInRoundRobinOrder() + { + BoltServerAddress address1 = new BoltServerAddress( "server-1", 1 ); + BoltServerAddress address2 = new BoltServerAddress( "server-2", 2 ); + BoltServerAddress address3 = new BoltServerAddress( "server-3", 3 ); + BoltServerAddress address4 = new BoltServerAddress( "server-4", 4 ); + + BoltServerAddress[] readers = {address1, address2, address3, address4}; + + assertEquals( address1, strategy.selectReader( readers ) ); + assertEquals( address2, strategy.selectReader( readers ) ); + assertEquals( address3, strategy.selectReader( readers ) ); + assertEquals( address4, strategy.selectReader( readers ) ); + + assertEquals( address1, strategy.selectReader( readers ) ); + assertEquals( address2, strategy.selectReader( readers ) ); + assertEquals( address3, strategy.selectReader( readers ) ); + assertEquals( address4, strategy.selectReader( readers ) ); + } + + @Test + public void shouldReturnWriterInRoundRobinOrder() + { + BoltServerAddress address1 = new BoltServerAddress( "server-1", 1 ); + BoltServerAddress address2 = new BoltServerAddress( "server-2", 2 ); + BoltServerAddress address3 = new BoltServerAddress( "server-3", 3 ); + + BoltServerAddress[] writers = {address1, address2, address3}; + + assertEquals( address1, strategy.selectWriter( writers ) ); + assertEquals( address2, strategy.selectWriter( writers ) ); + assertEquals( address3, strategy.selectWriter( writers ) ); + + assertEquals( address1, strategy.selectWriter( writers ) ); + assertEquals( address2, strategy.selectWriter( writers ) ); + assertEquals( address3, strategy.selectWriter( writers ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java index 4433673370..a734dae5a1 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java +++ b/driver/src/test/java/org/neo4j/driver/internal/util/Matchers.java @@ -28,8 +28,8 @@ import org.neo4j.driver.internal.InternalDriver; import org.neo4j.driver.internal.SessionFactory; import org.neo4j.driver.internal.SessionFactoryImpl; +import org.neo4j.driver.internal.cluster.AddressSet; import org.neo4j.driver.internal.cluster.LoadBalancer; -import org.neo4j.driver.internal.cluster.RoundRobinAddressSet; import org.neo4j.driver.internal.cluster.RoutingTable; import org.neo4j.driver.internal.net.BoltServerAddress; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -48,9 +48,11 @@ public static Matcher containsRouter( final BoltServerAddress addr @Override protected boolean matchesSafely( RoutingTable routingTable ) { - for ( int i = 0; i < routingTable.routerSize(); i++ ) + BoltServerAddress[] addresses = routingTable.routers().toArray(); + + for ( BoltServerAddress currentAddress : addresses ) { - if ( routingTable.nextRouter().equals( address ) ) + if ( currentAddress.equals( address ) ) { return true; } @@ -157,11 +159,12 @@ public void describeTo( Description description ) }; } - private static boolean contains( RoundRobinAddressSet set, BoltServerAddress address ) + private static boolean contains( AddressSet set, BoltServerAddress address ) { - for ( int i = 0; i < set.size(); i++ ) + BoltServerAddress[] addresses = set.toArray(); + for ( BoltServerAddress currentAddress : addresses ) { - if ( set.next().equals( address ) ) + if ( currentAddress.equals( address ) ) { return true; }